View

This file was deleted.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -1,8 +1,207 @@
//! Task execution utilities.
//!
//! This module only contains `current_thread`, an executor for multiplexing
//! many tasks on a single thread.
//! In the Tokio execution model, futures are lazy. When a future is created, no
//! work is performed. In order for the work defined by the future to happen,
//! the future must be submitted to an executor. A future that is submitted to
//! an executor is called a "task".
//!
//! The executor executor is responsible for ensuring that [`Future::poll`] is
//! called whenever the task is [notified]. Notification happens when the
//! internal state of a task transitions from "not ready" to ready. For
//! example, a socket might have received data and a call to `read` will now be
//! able to succeed.
//!
//! The specific strategy used to manage the tasks is left up to the
//! executor. There are two main flavors of executors: single-threaded and
//! multithreaded. This module provides both.
//!
//! * **[`current_thread`]**: A single-threaded executor that support spawning
//! tasks that are not `Send`. It guarantees that tasks will be executed on
//! the same thread from which they are spawned.
//!
//! * **[`thread_pool`]**: A multi-threaded executor that maintains a pool of
//! threads. Tasks are spawned to one of the threads in the pool and executed.
//! The pool employes a [work-stealing] strategy for optimizing how tasks get
//! spread across the available threads.
//!
//! # `Executor` trait.
//!
//! This module provides the [`Executor`] trait (re-exported from
//! [`tokio-executor`]), which describes the API that all executors must
//! implement.
//!
//! A free [`spawn`] function is provided that allows spawning futures onto the
//! default executor (tracked via a thread-local variable) without referencing a
//! handle. It is expected that all executors will set a value for the default
//! executor. This value will often be set to the executor itself, but it is
//! possible that the default executor might be set to a different executor.
//!
//! For example, the [`current_thread`] executor might set the default executor
//! to a thread pool instead of itself, allowing futures to spawn new tasks onto
//! the thread pool when those tasks are `Send`.
//!
//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
//! [notified]: https://docs.rs/futures/0.1/futures/executor/trait.Notify.html#tymethod.notify
//! [`current_thread`]: current_thread/index.html
//! [`thread_pool`]: thread_pool/index.html
//! [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
//! [`tokio-executor`]: #
//! [`Executor`]: #
//! [`spawn`]: #
pub mod current_thread;
mod scheduler;
mod sleep;
pub mod thread_pool {
//! Maintains a pool of threads across which the set of spawned tasks are
//! executed.
//!
//! [`ThreadPool`] is an executor that uses a thread pool for executing
//! tasks concurrently across multiple cores. It uses a thread pool that is
//! optimized for use cases that involve multiplexing large number of
//! independent tasks that perform short(ish) amounts of computation and are
//! mainly waiting on I/O, i.e. the Tokio use case.
//!
//! Usually, users of [`ThreadPool`] will not create pool instances.
//! Instead, they will create a [`Runtime`] instance, which comes with a
//! pre-configured thread pool.
//!
//! At the core, [`ThreadPool`] uses a work-stealing based scheduling
//! strategy. When spawning a task while *external* to the thread pool
//! (i.e., from a thread that is not part of the thread pool), the task is
//! randomly assigned to a worker thread. When spawning a task while
//! *internal* to the thread pool, the task is assigned to the current
//! worker.
//!
//! Each worker maintains its own queue and first focuses on processing all
//! tasks in its queue. When the worker's queue is empty, the worker will
//! attempt to *steal* tasks from other worker queues. This strategy helps
//! ensure that work is evenly distributed across threads while minimizing
//! synchronization between worker threads.
//!
//! # Usage
//!
//! Thread pool instances are created using [`ThreadPool::new`] or
//! [`Builder::new`]. The first option returns a thread pool with default
//! configuration values. The second option allows configuring the thread
//! pool before instantiating it.
//!
//! Once an instance is obtained, futures may be spawned onto it using the
//! [`spawn`] function.
//!
//! A handle to the thread pool is obtained using [`ThreadPool::sender`].
//! This handle is **only** able to spawn futures onto the thread pool. It
//! is unable to affect the lifecycle of the thread pool in any way. This
//! handle can be passed into functions or stored in structs as a way to
//! grant the capability of spawning futures.
//!
//! # Examples
//!
//! ```rust
//! # extern crate tokio;
//! # extern crate futures;
//! # use tokio::executor::thread_pool::ThreadPool;
//! use futures::future::{Future, lazy};
//!
//! # pub fn main() {
//! // Create a thread pool with default configuration values
//! let thread_pool = ThreadPool::new();
//!
//! thread_pool.spawn(lazy(|| {
//! println!("called from a worker thread");
//! Ok(())
//! }));
//!
//! // Gracefully shutdown the threadpool
//! thread_pool.shutdown().wait().unwrap();
//! # }
//! ```
//!
//! [`ThreadPool`]: struct.ThreadPool.html
//! [`ThreadPool::new`]: struct.ThreadPool.html#method.new
//! [`ThreadPool::sender`]: struct.ThreadPool.html#method.sender
//! [`spawn`]: struct.ThreadPool.html#method.spawn
//! [`Builder::new`]: struct.Builder.html#method.new
//! [`Runtime`]: ../../runtime/struct.Runtime.html
pub use tokio_threadpool::{
Builder,
Sender,
Shutdown,
ThreadPool,
};
}
pub use tokio_executor::{Executor, DefaultExecutor, SpawnError};
use futures::{Future, Poll, Async};
/// Future, returned by `spawn`, that completes once the future is spawned.
///
/// See [`spawn`] for more details.
///
/// [`spawn`]: fn.spawn.html
#[derive(Debug)]
#[must_use = "Spawn does nothing unless polled"]
pub struct Spawn<F>(Option<F>);
/// Spawns a future on the default executor.
///
/// In order for a future to do work, it must be spawned on an executor. The
/// `spawn` function is the easiest way to do this. It spawns a future on the
/// [default executor] for the current execution context (tracked using a
/// thread-local variable).
///
/// The default executor is **usually** a thread pool.
///
/// Note that the function doesn't immediately spawn the future. Instead, it
/// returns `Spawn`, which itself is a future that completes once the spawn has
/// succeeded.
///
/// # Examples
///
/// In this example, a server is started and `spawn` is used to start a new task
/// that processes each received connection.
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{Future, Stream};
/// use tokio::net::TcpListener;
///
/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
/// # unimplemented!();
/// # }
/// # fn dox() {
/// # let addr = "127.0.0.1:8080".parse().unwrap();
/// let listener = TcpListener::bind(&addr).unwrap();
///
/// let server = listener.incoming()
/// .map_err(|e| println!("error = {:?}", e))
/// .for_each(|socket| {
/// tokio::spawn(process(socket))
/// });
///
/// tokio::run(server);
/// # }
/// # pub fn main() {}
/// ```
///
/// [default executor]: struct.DefaultExecutor.html
pub fn spawn<F>(f: F) -> Spawn<F>
where F: Future<Item = (), Error = ()> + 'static + Send
{
Spawn(Some(f))
}
impl<F> Future for Spawn<F>
where F: Future<Item = (), Error = ()> + Send + 'static
{
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
::tokio_executor::spawn(self.0.take().unwrap());
Ok(Async::Ready(()))
}
}
View

This file was deleted.

Oops, something went wrong.
View
@@ -39,27 +39,24 @@
//!
//! ```no_run
//! extern crate futures;
//! extern crate futures_cpupool;
//! extern crate tokio;
//! extern crate tokio_io;
//!
//! use futures::prelude::*;
//! use futures::future::Executor;
//! use futures_cpupool::CpuPool;
//! use tokio_io::AsyncRead;
//! use tokio_io::io::copy;
//! use tokio::net::TcpListener;
//!
//! fn main() {
//! let pool = CpuPool::new_num_cpus();
//!
//! // Bind the server's socket.
//! let addr = "127.0.0.1:12345".parse().unwrap();
//! let listener = TcpListener::bind(&addr)
//! .expect("unable to bind TCP listener");
//!
//! // Pull out a stream of sockets for incoming connections
//! let server = listener.incoming().for_each(|sock| {
//! let server = listener.incoming()
//! .map_err(|e| println!("accept failed = {:?}", e))
//! .for_each(|sock| {
//! // Split up the reading and writing parts of the
//! // socket.
//! let (reader, writer) = sock.split();
@@ -76,13 +73,11 @@
//! });
//!
//! // Spawn the future as a concurrent task.
//! pool.execute(handle_conn).unwrap();
//!
//! Ok(())
//! tokio::spawn(handle_conn)
//! });
//!
//! // Spin up the server on this thread
//! server.wait().unwrap();
//! // Start the Tokio runtime
//! tokio::run(server);
//! }
//! ```
@@ -99,10 +94,16 @@ extern crate mio;
extern crate slab;
#[macro_use]
extern crate tokio_io;
extern crate tokio_executor;
extern crate tokio_threadpool;
#[macro_use]
extern crate log;
pub mod executor;
pub mod net;
pub mod reactor;
pub mod runtime;
pub use executor::spawn;
pub use runtime::run;
View
@@ -0,0 +1,209 @@
use std::io;
use std::thread;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use reactor::{Reactor, Handle};
use futures::{Future, Async, Poll};
use futures::task::AtomicTask;
/// Handle to the reactor running on a background thread.
#[derive(Debug)]
pub struct Background {
/// When `None`, the reactor thread will run until the process terminates.
inner: Option<Inner>,
}
/// Future that resolves when the reactor thread has shutdown.
#[derive(Debug)]
pub struct Shutdown {
inner: Inner,
}
/// Actual Background handle.
#[derive(Debug)]
struct Inner {
/// Handle to the reactor
handle: Handle,
/// Shared state between the background handle and the reactor thread.
shared: Arc<Shared>,
}
#[derive(Debug)]
struct Shared {
/// Signal the reactor thread to shutdown.
shutdown: AtomicUsize,
/// Task to notify when the reactor thread enters a shutdown state.
shutdown_task: AtomicTask,
}
/// Notifies the reactor thread to shutdown once the reactor becomes idle.
const SHUTDOWN_IDLE: usize = 1;
/// Notifies the reactor thread to shutdown immediately.
const SHUTDOWN_NOW: usize = 2;
/// The reactor is currently shutdown.
const SHUTDOWN: usize = 3;
// ===== impl Background =====
impl Background {
/// Launch a reactor in the background and return a handle to the thread.
pub fn new(reactor: Reactor) -> io::Result<Background> {
// Grab a handle to the reactor
let handle = reactor.handle().clone();
// Create the state shared between the background handle and the reactor
// thread.
let shared = Arc::new(Shared {
shutdown: AtomicUsize::new(0),
shutdown_task: AtomicTask::new(),
});
// For the reactor thread
let shared2 = shared.clone();
// Start the reactor thread
thread::Builder::new()
.spawn(move || run(reactor, shared2))?;
Ok(Background {
inner: Some(Inner {
handle,
shared,
}),
})
}
/// Returns a reference to the reactor handle.
pub fn handle(&self) -> &Handle {
&self.inner.as_ref().unwrap().handle
}
/// Shutdown the reactor on idle.
///
/// Returns a future that completes once the reactor thread has shutdown.
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
inner.shutdown_on_idle();
Shutdown { inner }
}
/// Shutdown the reactor immediately
///
/// Returns a future that completes once the reactor thread has shutdown.
pub fn shutdown_now(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
inner.shutdown_now();
Shutdown { inner }
}
/// Run the reactor on its thread until the process terminates.
pub fn forget(mut self) {
drop(self.inner.take());
}
}
impl Drop for Background {
fn drop(&mut self) {
let inner = match self.inner.take() {
Some(i) => i,
None => return,
};
let shutdown = Shutdown { inner };
let _ = shutdown.wait();
}
}
// ===== impl Shutdown =====
impl Future for Shutdown {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.inner.shared.shutdown_task.register();
if !self.inner.is_shutdown() {
return Ok(Async::NotReady);
}
Ok(().into())
}
}
// ===== impl Inner =====
impl Inner {
/// Returns true if the reactor thread is shutdown.
fn is_shutdown(&self) -> bool {
self.shared.shutdown.load(SeqCst) == SHUTDOWN
}
/// Notify the reactor thread to shutdown once the reactor transitions to an
/// idle state.
fn shutdown_on_idle(&self) {
self.shared.shutdown
.compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
self.handle.wakeup();
}
/// Notify the reactor thread to shutdown immediately.
fn shutdown_now(&self) {
let mut curr = self.shared.shutdown.load(SeqCst);
loop {
if curr >= SHUTDOWN_NOW {
return;
}
let act = self.shared.shutdown
.compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
if act == curr {
self.handle.wakeup();
return;
}
curr = act;
}
}
}
// ===== impl Reactor thread =====
fn run(mut reactor: Reactor, shared: Arc<Shared>) {
debug!("starting background reactor");
loop {
let shutdown = shared.shutdown.load(SeqCst);
if shutdown == SHUTDOWN_NOW {
debug!("shutting background reactor down NOW");
break;
}
if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
debug!("shutting background reactor on idle");
break;
}
reactor.turn(None).unwrap();
}
drop(reactor);
// Transition the state to shutdown
shared.shutdown.store(SHUTDOWN, SeqCst);
// Notify any waiters
shared.shutdown_task.notify();
debug!("background reactor has shutdown");
}
View

This file was deleted.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -0,0 +1,362 @@
//! A batteries included runtime for applications using Tokio.
//!
//! Applications using Tokio require some runtime support in order to work:
//!
//! * A [reactor] to drive I/O resources.
//! * An [executor] to execute tasks that use these I/O resources.
//!
//! While it is possible to setup each component manually, this involves a bunch
//! of boilerplate.
//!
//! [`Runtime`] bundles all of these various runtime components into a single
//! handle that can be started and shutdown together, eliminating the necessary
//! boilerplate to run a Tokio application.
//!
//! Most applications wont need to use [`Runtime`] directly. Instead, they will
//! use the [`run`] function, which uses [`Runtime`] under the hood.
//!
//! Creating a [`Runtime`] does the following:
//!
//! * Spawn a background thread running a [`Reactor`] instance.
//! * Start a [`ThreadPool`] for executing futures.
//!
//! The thread pool uses a work-stealing strategy and is configured to start a
//! worker thread for each CPU core available on the system. This tends to be
//! the ideal setup for Tokio applications.
//!
//! # Usage
//!
//! Most applications will use the [`run`] function. This takes a future to
//! "seed" the application, blocking the thread until the runtime becomes
//! [idle].
//!
//! ```rust
//! # extern crate tokio;
//! # extern crate futures;
//! # use futures::{Future, Stream};
//! use tokio::net::TcpListener;
//!
//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
//! # unimplemented!();
//! # }
//! # fn dox() {
//! # let addr = "127.0.0.1:8080".parse().unwrap();
//! let listener = TcpListener::bind(&addr).unwrap();
//!
//! let server = listener.incoming()
//! .map_err(|e| println!("error = {:?}", e))
//! .for_each(|socket| {
//! tokio::spawn(process(socket))
//! });
//!
//! tokio::run(server);
//! # }
//! # pub fn main() {}
//! ```
//!
//! In this function, the `run` function blocks until the runtime becomes idle.
//! See [`shutdown_on_idle`][idle] for more shutdown details.
//!
//! From within the context of the runtime, additional tasks are spawned using
//! the [`tokio::spawn`] function. Futures spawned using this function will be
//! executed on the same thread pool used by the [`Runtime`].
//!
//! A [`Runtime`] instance can also be used directly.
//!
//! ```rust
//! # extern crate tokio;
//! # extern crate futures;
//! # use futures::{Future, Stream};
//! use tokio::runtime::Runtime;
//! use tokio::net::TcpListener;
//!
//! # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
//! # unimplemented!();
//! # }
//! # fn dox() {
//! # let addr = "127.0.0.1:8080".parse().unwrap();
//! let listener = TcpListener::bind(&addr).unwrap();
//!
//! let server = listener.incoming()
//! .map_err(|e| println!("error = {:?}", e))
//! .for_each(|socket| {
//! tokio::spawn(process(socket))
//! });
//!
//! // Create the runtime
//! let mut rt = Runtime::new().unwrap();
//!
//! // Spawn the server task
//! rt.spawn(server);
//!
//! // Wait until the runtime becomes idle and shut it down.
//! rt.shutdown_on_idle()
//! .wait().unwrap();
//! # }
//! # pub fn main() {}
//! ```
//!
//! [reactor]: ../reactor/struct.Reactor.html
//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors
//! [`Runtime`]: struct.Runtime.html
//! [`ThreadPool`]: ../executor/thread_pool/struct.ThreadPool.html
//! [`run`]: fn.run.html
//! [idle]: struct.Runtime.html#method.shutdown_on_idle
//! [`tokio::spawn`]: ../executor/fn.spawn.html
use reactor::{self, Reactor, Handle};
use reactor::background::Background;
use tokio_threadpool::{self as threadpool, ThreadPool};
use futures::Poll;
use futures::future::Future;
use std::{fmt, io};
/// Handle to the Tokio runtime.
///
/// The Tokio runtime includes a reactor as well as an executor for running
/// tasks.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
#[derive(Debug)]
pub struct Runtime {
inner: Option<Inner>,
}
/// A future that resolves when the Tokio `Runtime` is shut down.
pub struct Shutdown {
inner: Box<Future<Item = (), Error = ()> + Send>,
}
#[derive(Debug)]
struct Inner {
/// Reactor running on a background thread.
reactor: Background,
/// Task execution pool.
pool: ThreadPool,
}
// ===== impl Runtime =====
/// Start the Tokio runtime using the supplied future to bootstrap execution.
///
/// This function is used to bootstrap the execution of a Tokio application. It
/// does the following:
///
/// * Start the Tokio runtime using a default configuration.
/// * Spawn the given future onto the thread pool.
/// * Block the çurrent thread until the runtime shuts down.
///
/// Note that the function will not return immediately once `future` has
/// completed. Instead it waits for the entire runtime to become idle.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{Future, Stream};
/// use tokio::net::TcpListener;
///
/// # fn process<T>(_: T) -> Box<Future<Item = (), Error = ()> + Send> {
/// # unimplemented!();
/// # }
/// # fn dox() {
/// # let addr = "127.0.0.1:8080".parse().unwrap();
/// let listener = TcpListener::bind(&addr).unwrap();
///
/// let server = listener.incoming()
/// .map_err(|e| println!("error = {:?}", e))
/// .for_each(|socket| {
/// tokio::spawn(process(socket))
/// });
///
/// tokio::run(server);
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if called from the context of an executor.
///
/// [mod]: ../index.html
pub fn run<F>(future: F)
where F: Future<Item = (), Error = ()> + Send + 'static,
{
let mut runtime = Runtime::new().unwrap();
runtime.spawn(future);
runtime.shutdown_on_idle().wait().unwrap();
}
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
// Spawn a reactor on a background thread.
let reactor = Reactor::new()?.background()?;
// Get a handle to the reactor.
let handle = reactor.handle().clone();
let pool = threadpool::Builder::new()

This comment has been minimized.

@olix0r

olix0r Feb 15, 2018

Member

we probably want a way to configure the upper bounds of number of threads this threadpool may run?

@olix0r

olix0r Feb 15, 2018

Member

we probably want a way to configure the upper bounds of number of threads this threadpool may run?

.around_worker(move |w, enter| {
reactor::with_default(&handle, enter, |_| {
w.run();
});
})
.build();
Ok(Runtime {
inner: Some(Inner {
reactor,
pool,
}),
})
}
/// Return a reference to the reactor handle for this runtime instance.
pub fn handle(&self) -> &Handle {
self.inner.as_ref().unwrap().reactor.handle()
}
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{future, Future, Stream};
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| {
/// println!("now running on a worker thread");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + Send + 'static,
{
self.inner_mut().pool.sender().spawn(future).unwrap();
self
}
/// Signals the runtime to shutdown once it becomes idle.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function can be used to perform a graceful shutdown of the runtime.
///
/// The runtime enters an idle state once **all** of the following occur.
///
/// * The thread pool has no tasks to execute, i.e., all tasks that were
/// spawned have completed.
/// * The reactor is not managing any I/O resources.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
let inner = Box::new({
let pool = inner.pool;
let reactor = inner.reactor;
pool.shutdown_on_idle().and_then(|_| {
reactor.shutdown_on_idle()
})
});
Shutdown { inner }
}
/// Signals the runtime to shutdown immediately.
///
/// Returns a future that completes once the shutdown operation has
/// completed.
///
/// This function will forcibly shutdown the runtime, causing any
/// in-progress work to become canceled. The shutdown steps are:
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
/// * Drop the reactor.
///
/// Once the reactor has dropped, any outstanding I/O resources bound to
/// that reactor will no longer function. Calling any method on them will
/// result in an error.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
pub fn shutdown_now(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
let inner = Box::new({
let pool = inner.pool;
let reactor = inner.reactor;
pool.shutdown_now().and_then(|_| {
reactor.shutdown_now()
})
});
Shutdown { inner }
}
fn inner_mut(&mut self) -> &mut Inner {
self.inner.as_mut().unwrap()
}
}
// ===== impl Shutdown =====
impl Future for Shutdown {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
try_ready!(self.inner.poll());
Ok(().into())
}
}
impl fmt::Debug for Shutdown {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Shutdown")
.field("inner", &"Box<Future<Item = (), Error = ()>>")
.finish()
}
}
View
@@ -0,0 +1,262 @@
extern crate tokio;
extern crate tokio_executor;
extern crate futures;
use tokio::executor::current_thread::{self, block_on_all, CurrentThread};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use futures::task;
use futures::future::{self, lazy};
use futures::prelude::*;
use futures::sync::oneshot;
#[test]
fn spawn_from_block_on_all() {
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let msg = current_thread::block_on_all(lazy(move || {
c.set(1 + c.get());
// Spawn!
current_thread::spawn(lazy(move || {
c.set(1 + c.get());
Ok::<(), ()>(())
}));
Ok::<_, ()>("hello")
})).unwrap();
assert_eq!(2, cnt.get());
assert_eq!(msg, "hello");
}
#[test]
fn block_waits() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let (tx, rx) = oneshot::channel();
thread::spawn(|| {
thread::sleep(Duration::from_millis(1000));
tx.send(()).unwrap();
});
block_on_all(rx.then(move |_| {
cnt.set(1 + cnt.get());
Ok::<_, ()>(())
})).unwrap();
assert_eq!(1, cnt2.get());
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let cnt = Rc::new(Cell::new(0));
let mut current_thread = CurrentThread::new();
for _ in 0..ITER {
let cnt = cnt.clone();
current_thread.spawn(lazy(move || {
cnt.set(1 + cnt.get());
Ok::<(), ()>(())
}));
}
current_thread.run().unwrap();
assert_eq!(cnt.get(), ITER);
}
#[test]
fn does_not_set_global_executor_by_default() {
use tokio_executor::Executor;
block_on_all(lazy(|| {
tokio_executor::DefaultExecutor::current()
.spawn(Box::new(lazy(|| ok())))
.unwrap_err();
ok()
})).unwrap();
}
#[test]
fn spawn_from_block_on_future() {
let cnt = Rc::new(Cell::new(0));
let mut current_thread = CurrentThread::new();
current_thread.block_on(lazy(|| {
let cnt = cnt.clone();
current_thread::spawn(lazy(move || {
cnt.set(1 + cnt.get());
Ok(())
}));
Ok::<_, ()>(())
})).unwrap();
current_thread.run().unwrap();
assert_eq!(1, cnt.get());
}
struct Never(Rc<()>);
impl Future for Never {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
}
}
#[test]
fn outstanding_tasks_are_dropped_when_executor_is_dropped() {
let mut rc = Rc::new(());
let mut current_thread = CurrentThread::new();
current_thread.spawn(Never(rc.clone()));
drop(current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
// Using the global spawn fn
let mut rc = Rc::new(());
let mut current_thread = CurrentThread::new();
current_thread.block_on(lazy(|| {
current_thread::spawn(Never(rc.clone()));
Ok::<_, ()>(())
})).unwrap();
drop(current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
}
#[test]
#[should_panic]
fn nesting_run() {
block_on_all(lazy(|| {
block_on_all(lazy(|| {
ok()
})).unwrap();
ok()
})).unwrap();
}
#[test]
#[should_panic]
fn run_in_future() {
block_on_all(lazy(|| {
current_thread::spawn(lazy(|| {
block_on_all(lazy(|| {
ok()
})).unwrap();
ok()
}));
ok()
})).unwrap();
}
#[test]
fn tick_on_infini_future() {
let num = Rc::new(Cell::new(0));
struct Infini {
num: Rc<Cell<usize>>,
}
impl Future for Infini {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.num.set(1 + self.num.get());
task::current().notify();
Ok(Async::NotReady)
}
}
CurrentThread::new()
.spawn(Infini {
num: num.clone(),
})
.turn(None)
.unwrap();
assert_eq!(1, num.get());
}
#[test]
fn tasks_are_scheduled_fairly() {
let state = Rc::new(RefCell::new([0, 0]));
struct Spin {
state: Rc<RefCell<[i32; 2]>>,
idx: usize,
}
impl Future for Spin {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let mut state = self.state.borrow_mut();
if self.idx == 0 {
let diff = state[0] - state[1];
assert!(diff.abs() <= 1);
if state[0] >= 50 {
return Ok(().into());
}
}
state[self.idx] += 1;
if state[self.idx] >= 100 {
return Ok(().into());
}
task::current().notify();
Ok(Async::NotReady)
}
}
block_on_all(lazy(|| {
current_thread::spawn(Spin {
state: state.clone(),
idx: 0,
});
current_thread::spawn(Spin {
state: state,
idx: 1,
});
ok()
})).unwrap();
}
fn ok() -> future::FutureResult<(), ()> {
future::ok(())
}
View
@@ -1,5 +1,6 @@
extern crate futures;
extern crate tokio;
extern crate env_logger;
use std::thread;
@@ -15,6 +16,8 @@ macro_rules! t {
#[test]
fn hammer() {
let _ = env_logger::init();
let threads = (0..10).map(|_| {
thread::spawn(|| {
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
View
@@ -0,0 +1,52 @@
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate env_logger;
use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
use tokio_io::io;
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn basic_runtime_usage() {
let _ = env_logger::init();
// TODO: Don't require the lazy wrapper
tokio::run(::futures::future::lazy(|| {
let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(server.local_addr());
let client = TcpStream::connect(&addr);
let server = server.incoming().take(1)
.map_err(|e| println!("accept err = {:?}", e))
.for_each(|socket| {
tokio::spawn({
io::write_all(socket, b"hello")
.map(|_| println!("write done"))
.map_err(|e| println!("write err = {:?}", e))
})
})
.map(|_| println!("accept done"));
let client = client
.map_err(|e| println!("connect err = {:?}", e))
.and_then(|client| {
// Read all
io::read_to_end(client, vec![])
.map(|_| println!("read done"))
.map_err(|e| println!("read err = {:?}", e))
});
tokio::spawn({
server.join(client)
.map(|_| println!("done"))
})
}));
}
View
@@ -0,0 +1,16 @@
[package]
name = "tokio-executor"
version = "0.1.0"
documentation = "https://docs.rs/tokio-executor"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://github.com/tokio-rs/tokio"
license = "MIT/Apache-2.0"
authors = ["Carl Lerche <me@carllerche.com>"]
description = """
Future execution primitives
"""
keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[dependencies]
futures = "0.1"
View
@@ -0,0 +1,97 @@
use std::prelude::v1::*;
use std::cell::Cell;
use std::fmt;
thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
/// Represents an executor context.
///
/// For more details, see [`enter` documentation](fn.enter.html)
pub struct Enter {
on_exit: Vec<Box<Callback>>,
permanent: bool,
}
/// An error returned by `enter` if an execution scope has already been
/// entered.
#[derive(Debug)]
pub struct EnterError {
_a: (),
}
/// Marks the current thread as being within the dynamic extent of an
/// executor.
///
/// Executor implementations should call this function before blocking the
/// thread. If `None` is returned, the executor should fail by panicking or
/// taking some other action without blocking the current thread. This prevents
/// deadlocks due to multiple executors competing for the same thread.
///
/// # Error
///
/// Returns an error if the current thread is already marked
pub fn enter() -> Result<Enter, EnterError> {
ENTERED.with(|c| {
if c.get() {
Err(EnterError { _a: () })
} else {
c.set(true);
Ok(Enter {
on_exit: Vec::new(),
permanent: false,
})
}
})
}
impl Enter {
/// Register a callback to be invoked if and when the thread
/// ceased to act as an executor.
pub fn on_exit<F>(&mut self, f: F) where F: FnOnce() + 'static {
self.on_exit.push(Box::new(f));
}
/// Treat the remainder of execution on this thread as part of an
/// executor; used mostly for thread pool worker threads.
///
/// All registered `on_exit` callbacks are *dropped* without being
/// invoked.
pub fn make_permanent(mut self) {
self.permanent = true;
}
}
impl fmt::Debug for Enter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Enter").finish()
}
}
impl Drop for Enter {
fn drop(&mut self) {
ENTERED.with(|c| {
assert!(c.get());
if self.permanent {
return
}
for callback in self.on_exit.drain(..) {
callback.call();
}
c.set(false);
});
}
}
trait Callback: 'static {
fn call(self: Box<Self>);
}
impl<F: FnOnce() + 'static> Callback for F {
fn call(self: Box<Self>) {
(*self)()
}
}
View
@@ -0,0 +1,154 @@
use super::{Executor, Enter, SpawnError};
use futures::Future;
use std::cell::Cell;
use std::marker::PhantomData;
use std::rc::Rc;
/// Executes futures on the default executor for the current execution context.
///
/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
/// without referencing a specific executor.
///
/// When an executor starts, it sets the `DefaultExecutor` handle to point to an
/// executor (usually itself) that is used to spawn new tasks.
///
/// The current `DefaultExecutor` reference is tracked using a thread-local
/// variable and is set using `tokio_executor::with_default`
#[derive(Debug, Clone)]
pub struct DefaultExecutor {
// Prevent the handle from moving across threads.
_p: PhantomData<Rc<()>>,
}
impl DefaultExecutor {
/// Returns a handle to the default executor for the current context.
///
/// Futures may be spawned onto the default executor using this handle.
///
/// The returned handle will reference whichever executor is configured as
/// the default **at the time `spawn` is called`. This enables
/// `DefaultExecutor::current()` to be called before an execution context is
/// setup, then passed **into** an execution context before it is used.
pub fn current() -> DefaultExecutor {
DefaultExecutor {
_p: PhantomData,
}
}
}
/// Thread-local tracking the current executor
thread_local!(static EXECUTOR: Cell<Option<*mut Executor>> = Cell::new(None));
// ===== impl DefaultExecutor =====
impl super::Executor for DefaultExecutor {
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
-> Result<(), SpawnError>
{
EXECUTOR.with(|current_executor| {
match current_executor.get() {
Some(executor) => {
let executor = unsafe { &mut *executor };
executor.spawn(future)
}
None => {
Err(SpawnError::shutdown())
}
}
})
}
}
// ===== global spawn fns =====
/// Submits a future for execution on the default executor -- usually a
/// threadpool.
///
/// Futures are lazy constructs. When they are defined, no work happens. In
/// order for the logic defined by the future to be run, the future must be
/// spawned on an executor. This function is the easiest way to do so.
///
/// This function must be called from an execution context, i.e. from a future
/// that has been already spawned onto an executor.
///
/// Once spawned, the future will execute. The details of how that happens is
/// left up to the executor instance. If the executor is a thread pool, the
/// future will be pushed onto a queue that a worker thread polls from. If the
/// executor is a "current thread" executor, the future might be polled
/// immediately from within the call to `spawn` or it might be pushed onto an
/// internal queue.
///
/// # Panics
///
/// This function will panic if the default executor is not set or if spawning
/// onto the default executor returns an error. To avoid the panic, use the
/// `DefaultExecutor` handle directly.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::spawn;
/// # pub fn dox() {
/// use futures::future::lazy;
///
/// spawn(lazy(|| {
/// println!("running on the default executor");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
pub fn spawn<T>(future: T)
where T: Future<Item = (), Error = ()> + Send + 'static,
{
DefaultExecutor::current().spawn(Box::new(future))
.unwrap()
}
/// Set the default executor for the duration of the closure
///
/// # Panics
///
/// This function panics if there already is a default executor set.
pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
where T: Executor,
F: FnOnce(&mut Enter) -> R
{
EXECUTOR.with(|cell| {
assert!(cell.get().is_none(), "default executor already set for execution context");
// Ensure that the executor is removed from the thread-local context
// when leaving the scope. This handles cases that involve panicking.
struct Reset<'a>(&'a Cell<Option<*mut Executor>>);
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
self.0.set(None);
}
}
let _reset = Reset(cell);
// While scary, this is safe. The function takes a
// `&mut Executor`, which guarantees that the reference lives for the
// duration of `with_default`.
//
// Because we are always clearing the TLS value at the end of the
// function, we can cast the reference to 'static which thread-local
// cells require.
let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };
cell.set(Some(executor));
f(enter)
})
}
unsafe fn hide_lt<'a>(p: *mut (Executor + 'a)) -> *mut (Executor + 'static) {
use std::mem;
mem::transmute(p)
}
View
@@ -0,0 +1,189 @@
//! Task execution utilities.
//!
//! In the Tokio execution model, futures are lazy. When a future is created, no
//! work is performed. In order for the work defined by the future to happen,
//! the future must be submitted to an executor. A future that is submitted to
//! an executor is called a "task".
//!
//! The executor executor is responsible for ensuring that [`Future::poll`] is
//! called whenever the task is [notified]. Notification happens when the
//! internal state of a task transitions from "not ready" to ready. For
//! example, a socket might have received data and a call to `read` will now be
//! able to succeed.
#![deny(missing_docs, missing_debug_implementations, warnings)]
#![doc(html_root_url = "https://docs.rs/tokio-executor/0.1")]
extern crate futures;
mod enter;
mod global;
pub mod park;
pub use enter::{enter, Enter, EnterError};
pub use global::{spawn, with_default, DefaultExecutor};
use futures::Future;
/// A value that executes futures.
///
/// The [`spawn`] function is used to submit a future to an executor. Once
/// submitted, the executor takes ownership of the future and becomes
/// responsible for driving the future to completion.
///
/// The strategy employed by the executor to handle the future is less defined
/// and is left up to the `Executor` implementation. The `Executor` instance is
/// expected to call [`poll`] on the future once it has been notified, however
/// the "when" and "how" can vary greatly.
///
/// For example, the executor might be a thread pool, in which case a set of
/// threads have already been spawned up and the future is inserted into a
/// queue. A thread will acquire the future and poll it.
///
/// The `Executor` trait is only for futures that **are** `Send`. These are most
/// common. There currently is no trait that describes executors that operate
/// entirely on the current thread (i.e., are able to spawn futures that are not
/// `Send`). Note that single threaded executors can still implement `Executor`,
/// but only futures that are `Send` can be spawned via the trait.
///
/// # Errors
///
/// The [`spawn`] function returns `Result` with an error type of `SpawnError`.
/// This error type represents the reason that the executor was unable to spawn
/// the future. The two current represented scenarios are:
///
/// * An executor being at capacity or full. As such, the executor is not able
/// to accept a new future. This error state is expected to be transient.
/// * An executor has been shutdown and can no longer accept new futures. This
/// error state is expected to be permanent.
///
/// If a caller encounters an at capacity error, the caller should try to shed
/// load. This can be as simple as dropping the future that was spawned.
///
/// If the caller encounters a shutdown error, the caller should attempt to
/// gracefully shutdown.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// # }
/// # fn main() {}
/// ```
///
/// [`spawn`]: #tymethod.spawn
/// [`poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
pub trait Executor {
/// Spawns a future object to run on this executor.
///
/// `future` is passed to the executor, which will begin running it. The
/// future may run on the current thread or another thread at the discretion
/// of the `Executor` implementation.
///
/// # Panics
///
/// Implementors are encouraged to avoid panics. However, a panic is
/// permitted and the caller should check the implementation specific
/// documentation for more details on possible panics.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// # }
/// # fn main() {}
/// ```
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
-> Result<(), SpawnError>;
/// Provides a best effort **hint** to whether or not `spawn` will succeed.
///
/// This function may return both false positives **and** false negatives.
/// If `status` returns `Ok`, then a call to `spawn` will *probably*
/// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
/// *probably* fail, but may succeed.
///
/// This allows a caller to avoid creating the task if the call to `spawn`
/// has a high likelihood of failing.
///
/// # Panics
///
/// This function must not panic. Implementors must ensure that panics do
/// not happen.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::Executor;
/// # fn docs(my_executor: &mut Executor) {
/// use futures::future::lazy;
///
/// if my_executor.status().is_ok() {
/// my_executor.spawn(Box::new(lazy(|| {
/// println!("running on the executor");
/// Ok(())
/// }))).unwrap();
/// } else {
/// println!("the executor is not in a good state");
/// }
/// # }
/// # fn main() {}
/// ```
fn status(&self) -> Result<(), SpawnError> {
Ok(())
}
}
/// Errors returned by `Executor::spawn`.
///
/// Spawn errors should represent relatively rare scenarios. Currently, the two
/// scenarios represented by `SpawnError` are:
///
/// * An executor being at capacity or full. As such, the executor is not able
/// to accept a new future. This error state is expected to be transient.
/// * An executor has been shutdown and can no longer accept new futures. This
/// error state is expected to be permanent.
#[derive(Debug)]
pub struct SpawnError {
is_shutdown: bool,
}
impl SpawnError {
/// Return a new `SpawnError` reflecting a shutdown executor failure.
pub fn shutdown() -> Self {
SpawnError { is_shutdown: true }
}
/// Return a new `SpawnError` reflecting an executor at capacity failure.
pub fn at_capacity() -> Self {
SpawnError { is_shutdown: false }
}
/// Returns `true` if the error reflects a shutdown executor failure.
pub fn is_shutdown(&self) -> bool {
self.is_shutdown
}
/// Returns `true` if the error reflects an executor at capacity failure.
pub fn is_at_capacity(&self) -> bool {
!self.is_shutdown
}
}
View
@@ -0,0 +1,294 @@
//! Abstraction over blocking and unblocking the current thread.
//!
//! Provides an abstraction over blocking the current thread. This is similar to
//! the park / unpark constructs provided by [`std`] but made generic. This
//! allows embedding custom functionality to perform when the thread is blocked.
//!
//! A blocked [`Park`][p] instance is unblocked by calling [`unpark`] on its
//! [`Unpark`][up] handle.
//!
//! The [`ParkThread`] struct implements [`Park`][p] using
//! [`thread::park`][`std`] to put the thread to sleep. The Tokio reactor also
//! implements park, but uses [`mio::Poll`][mio] to block the thread instead.
//!
//! The [`Park`][p] trait is composable. A timer implementation might decorate a
//! [`Park`][p] implementation by checking if any timeouts have elapsed after
//! the inner [`Park`][p] implementation unblocks.
//!
//! # Model
//!
//! Conceptually, each [`Park`][p] instance has an associated token, which is
//! initially not present:
//!
//! * The [`park`] method blocks the current thread unless or until the token
//! is available, at which point it atomically consumes the token.
//! * The [`unpark`] method atomically makes the token available if it wasn't
//! already.
//!
//! Some things to note:
//!
//! * If [`unpark`] is called before [`park`], the next call to [`park`] will
//! **not** block the thread.
//! * **Spurious** wakeups are permited, i.e., the [`park`] method may unblock
//! even if [`unpark`] was not called.
//! * [`park_timeout`] does the same as [`park`] but allows specifying a maximum
//! time to block the thread for.
//!
//! [`std`]: https://doc.rust-lang.org/std/thread/fn.park.html
//! [`thread::park`]: https://doc.rust-lang.org/std/thread/fn.park.html
//! [`ParkThread`]: struct.ParkThread.html
//! [p]: trait.Park.html
//! [`park`]: trait.Park.html#tymethod.park
//! [`park_timeout`]: trait.Park.html#tymethod.park_timeout
//! [`unpark`]: trait.Unpark.html#tymethod.unpark
//! [up]: trait.Unpark.html
//! [mio]: https://docs.rs/mio/0.6.13/mio/struct.Poll.html
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
/// Block the current thread.
///
/// See [module documentation][mod] for more details.
///
/// [mod]: ../index.html
pub trait Park {
/// Unpark handle type for the `Park` implementation.
type Unpark: Unpark;
/// Error returned by `park`
type Error;
/// Get a new `Unpark` handle associated with this `Park` instance.
fn unpark(&self) -> Self::Unpark;
/// Block the current thread unless or until the token is available.
///
/// A call to `park` does not guarantee that the thread will remain blocked
/// forever, and callers should be prepared for this possibility. This
/// function may wakeup spuriously for any reason.
///
/// See [module documentation][mod] for more details.
///
/// # Panics
///
/// This function **should** not panic, but ultimiately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation
///
/// [mod]: ../index.html
fn park(&mut self) -> Result<(), Self::Error>;
/// Park the current thread for at most `duration`.
///
/// This function is the same as `park` but allows specifying a maximum time
/// to block the thread for.
///
/// Same as `park`, there is no guarantee that the thread will remain
/// blocked for any amount of time. Spurious wakeups are permitted for any
/// reason.
///
/// See [module documentation][mod] for more details.
///
/// # Panics
///
/// This function **should** not panic, but ultimiately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Park` implementation
///
/// [mod]: ../index.html
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
}
/// Unblock a thread blocked by the associated [`Park`] instance.
///
/// See [module documentation][mod] for more details.
///
/// [mod]: ../index.html
/// [`Park`]: trait.Park.html
pub trait Unpark: Sync + Send + 'static {
/// Unblock a thread that is blocked by the associated `Park` handle.
///
/// Calling `unpark` atomically makes available the unpark token, if it is
/// not already available.
///
/// See [module documentation][mod] for more details.
///
/// # Panics
///
/// This function **should** not panic, but ultimiately, panics are left as
/// an implementation detail. Refer to the documentation for the specific
/// `Unpark` implementation
///
/// [mod]: ../index.html
fn unpark(&self);
}
/// Blocks the current thread using a condition variable.
///
/// Implements the [`Park`] functionality by using a condition variable. An
/// atomic variable is also used to avoid using the condition variable if
/// possible.
///
/// The condition variable is cached in a thread-local variable and is shared
/// across all `ParkThread` instances created on the same thread. This also
/// means that an instance of `ParkThread` might be unblocked by a handle
/// associated with a different `ParkThread` instance.
#[derive(Debug)]
pub struct ParkThread {
_anchor: PhantomData<Rc<()>>,
}
/// Error returned by [`ParkThread`]
///
/// This currently is never returned, but might at some point in the future.
///
/// [`ParkThread`]: struct.ParkThread.html
#[derive(Debug)]
pub struct ParkError {
_p: (),
}
/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub struct UnparkThread {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;
thread_local! {
static CURRENT_PARK_THREAD: Arc<Inner> = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
}
// ===== impl ParkThread =====
impl ParkThread {
/// Create a new `ParkThread` handle for the current thread.
///
/// This type cannot be moved to other threads, so it should be created on
/// the thread that the caller intends to park.
pub fn new() -> ParkThread {
ParkThread {
_anchor: PhantomData,
}
}
/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> R
where F: FnOnce(&Arc<Inner>) -> R,
{
CURRENT_PARK_THREAD.with(|inner| f(inner))
}
}
impl Park for ParkThread {
type Unpark = UnparkThread;
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
let inner = self.with_current(|inner| inner.clone());
UnparkThread { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(None))
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(Some(duration)))
}
}
// ===== impl UnparkThread =====
impl Unpark for UnparkThread {
fn unpark(&self) {
self.inner.unpark();
}
}
// ===== impl Inner =====
impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return Ok(()),
IDLE => {},
_ => unreachable!(),
}
// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();
// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return Ok(());
}
IDLE => {},
_ => unreachable!(),
}
m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};
// Transition back to idle. If the state has transitione dto `NOTIFY`,
// this will consume that notification
self.state.store(IDLE, Ordering::SeqCst);
// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);
Ok(())
}
fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}
// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
// Transition from SLEEP -> NOTIFY
match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
SLEEP => {}
_ => return,
}
// Wakeup the sleeper
self.condvar.notify_one();
}
}
View
@@ -0,0 +1,11 @@
extern crate tokio_executor;
extern crate futures;
use tokio_executor::*;
use futures::future::lazy;
#[test]
fn spawn_out_of_executor_context() {
let res = DefaultExecutor::current().spawn(Box::new(lazy(|| Ok(()))));
assert!(res.is_err());
}
View
@@ -0,0 +1,26 @@
[package]
name = "tokio-threadpool"
version = "0.1.0"
documentation = "https://docs.rs/tokio-threadpool"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://github.com/tokio-rs/tokio"
license = "MIT/Apache-2.0"
authors = ["Carl Lerche <me@carllerche.com>"]
description = """
A Future aware thread pool based on work stealing.
"""
keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1", path = "../tokio-executor" }
futures = "0.1"
coco = "0.3"
num_cpus = "1.2"
rand = "0.3"
log = "0.3"
[dev-dependencies]
tokio-timer = "0.1"
env_logger = "0.4"
futures-cpupool = "0.1.7"
View
@@ -0,0 +1,52 @@
# Tokio Thread Pool
A library for scheduling execution of futures concurrently across a pool of
threads.
**Note**: This library isn't quite ready for use.
### Why not Rayon?
Rayon is designed to handle parallelizing single computations by breaking them
into smaller chunks. The scheduling for each individual chunk doesn't matter as
long as the root computation completes in a timely fashion. In other words,
Rayon does not provide any guarantees of fairness with regards to how each task
gets scheduled.
On the other hand, `tokio-threadpool` is a general purpose scheduler and
attempts to schedule each task fairly. This is the ideal behavior when
scheduling a set of unrelated tasks.
### Why not futures-cpupool?
It's 10x slower.
## Examples
```rust
extern crate tokio_threadpool;
extern crate futures;
use tokio_threadpool::*;
use futures::*;
use futures::sync::oneshot;
pub fn main() {
let (tx, _pool) = ThreadPool::new();
let res = oneshot::spawn(future::lazy(|| {
println!("Running on the pool");
Ok::<_, ()>("complete")
}), &tx);
println!("Result: {:?}", res.wait());
}
```
## License
`tokio-threadpool` is primarily distributed under the terms of both the MIT
license and the Apache License (Version 2.0), with portions covered by various
BSD-like licenses.
See LICENSE-APACHE, and LICENSE-MIT for details.
View
@@ -0,0 +1,162 @@
#![feature(test)]
extern crate futures;
extern crate futures_pool;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate test;
const NUM_SPAWN: usize = 10_000;
const NUM_YIELD: usize = 1_000;
const TASKS_PER_CPU: usize = 50;
mod us {
use futures::{task, Async};
use futures::future::{self, Executor};
use futures_pool::*;
use num_cpus;
use test;
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
#[bench]
fn spawn_many(b: &mut test::Bencher) {
let (sched_tx, _scheduler) = Pool::new();
let (tx, rx) = mpsc::sync_channel(10);
let rem = Arc::new(AtomicUsize::new(0));
b.iter(move || {
rem.store(super::NUM_SPAWN, SeqCst);
for _ in 0..super::NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();
sched_tx.execute(future::lazy(move || {
if 1 == rem.fetch_sub(1, SeqCst) {
tx.send(()).unwrap();
}
Ok(())
})).ok().unwrap();
}
let _ = rx.recv().unwrap();
});
}
#[bench]
fn yield_many(b: &mut test::Bencher) {
let (sched_tx, _scheduler) = Pool::new();
let tasks = super::TASKS_PER_CPU * num_cpus::get();
let (tx, rx) = mpsc::sync_channel(tasks);
b.iter(move || {
for _ in 0..tasks {
let mut rem = super::NUM_YIELD;
let tx = tx.clone();
sched_tx.execute(future::poll_fn(move || {
rem -= 1;
if rem == 0 {
tx.send(()).unwrap();
Ok(Async::Ready(()))
} else {
// Notify the current task
task::current().notify();
// Not ready
Ok(Async::NotReady)
}
})).ok().unwrap();
}
for _ in 0..tasks {
let _ = rx.recv().unwrap();
}
});
}
}
// In this case, CPU pool completes the benchmark faster, but this is due to how
// CpuPool currently behaves, starving other futures. This completes the
// benchmark quickly but results in poor runtime characteristics for a thread
// pool.
//
// See alexcrichton/futures-rs#617
//
mod cpupool {
use futures::{task, Async};
use futures::future::{self, Executor};
use futures_cpupool::*;
use num_cpus;
use test;
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
#[bench]
fn spawn_many(b: &mut test::Bencher) {
let pool = CpuPool::new(num_cpus::get());
let (tx, rx) = mpsc::sync_channel(10);
let rem = Arc::new(AtomicUsize::new(0));
b.iter(move || {
rem.store(super::NUM_SPAWN, SeqCst);
for _ in 0..super::NUM_SPAWN {
let tx = tx.clone();
let rem = rem.clone();
pool.execute(future::lazy(move || {
if 1 == rem.fetch_sub(1, SeqCst) {
tx.send(()).unwrap();
}
Ok(())
})).ok().unwrap();
}
let _ = rx.recv().unwrap();
});
}
#[bench]
fn yield_many(b: &mut test::Bencher) {
let pool = CpuPool::new(num_cpus::get());
let tasks = super::TASKS_PER_CPU * num_cpus::get();
let (tx, rx) = mpsc::sync_channel(tasks);
b.iter(move || {
for _ in 0..tasks {
let mut rem = super::NUM_YIELD;
let tx = tx.clone();
pool.execute(future::poll_fn(move || {
rem -= 1;
if rem == 0 {
tx.send(()).unwrap();
Ok(Async::Ready(()))
} else {
// Notify the current task
task::current().notify();
// Not ready
Ok(Async::NotReady)
}
})).ok().unwrap();
}
for _ in 0..tasks {
let _ = rx.recv().unwrap();
}
});
}
}
View
@@ -0,0 +1,72 @@
#![feature(test)]
extern crate futures;
extern crate futures_pool;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate test;
const ITER: usize = 20_000;
mod us {
use futures::future::{self, Executor};
use futures_pool::*;
use test;
use std::sync::mpsc;
#[bench]
fn chained_spawn(b: &mut test::Bencher) {
let (sched_tx, _scheduler) = Pool::new();
fn spawn(sched_tx: Sender, res_tx: mpsc::Sender<()>, n: usize) {
if n == 0 {
res_tx.send(()).unwrap();
} else {
let sched_tx2 = sched_tx.clone();
sched_tx.execute(future::lazy(move || {
spawn(sched_tx2, res_tx, n - 1);
Ok(())
})).ok().unwrap();
}
}
b.iter(move || {
let (res_tx, res_rx) = mpsc::channel();
spawn(sched_tx.clone(), res_tx, super::ITER);
res_rx.recv().unwrap();
});
}
}
mod cpupool {
use futures::future::{self, Executor};
use futures_cpupool::*;
use num_cpus;
use test;
use std::sync::mpsc;
#[bench]
fn chained_spawn(b: &mut test::Bencher) {
let pool = CpuPool::new(num_cpus::get());
fn spawn(pool: CpuPool, res_tx: mpsc::Sender<()>, n: usize) {
if n == 0 {
res_tx.send(()).unwrap();
} else {
let pool2 = pool.clone();
pool.execute(future::lazy(move || {
spawn(pool2, res_tx, n - 1);
Ok(())
})).ok().unwrap();
}
}
b.iter(move || {
let (res_tx, res_rx) = mpsc::channel();
spawn(pool.clone(), res_tx, super::ITER);
res_rx.recv().unwrap();
});
}
}
View
@@ -0,0 +1,46 @@
extern crate futures;
extern crate tokio_threadpool;
extern crate env_logger;
use tokio_threadpool::*;
use futures::future::{self, Executor};
use std::sync::mpsc;
const ITER: usize = 2_000_000;
// const ITER: usize = 30;
fn chained_spawn() {
let pool = ThreadPool::new();
let tx = pool.sender().clone();
fn spawn(tx: Sender, res_tx: mpsc::Sender<()>, n: usize) {
if n == 0 {
res_tx.send(()).unwrap();
} else {
let tx2 = tx.clone();
tx.execute(future::lazy(move || {
spawn(tx2, res_tx, n - 1);
Ok(())
})).ok().unwrap();
}
}
loop {
println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
let (res_tx, res_rx) = mpsc::channel();
for _ in 0..10 {
spawn(tx.clone(), res_tx.clone(), ITER);
}
for _ in 0..10 {
res_rx.recv().unwrap();
}
}
}
pub fn main() {
let _ = ::env_logger::init();
chained_spawn();
}
View
@@ -0,0 +1,21 @@
extern crate futures;
extern crate tokio_threadpool;
extern crate env_logger;
use tokio_threadpool::*;
use futures::*;
use futures::sync::oneshot;
pub fn main() {
let _ = ::env_logger::init();
let pool = ThreadPool::new();
let tx = pool.sender().clone();
let res = oneshot::spawn(future::lazy(|| {
println!("Running on the pool");
Ok::<_, ()>("complete")
}), &tx);
println!("Result: {:?}", res.wait());
}
View
@@ -0,0 +1,34 @@
extern crate futures;
extern crate tokio_threadpool;
extern crate tokio_timer;
extern crate env_logger;
use tokio_threadpool::*;
use tokio_timer::Timer;
use futures::*;
use futures::sync::oneshot::spawn;
use std::thread;
use std::time::Duration;
pub fn main() {
let _ = ::env_logger::init();
let timer = Timer::default();
{
let pool = ThreadPool::new();
let tx = pool.sender().clone();
let fut = timer.interval(Duration::from_millis(300))
.for_each(|_| {
println!("~~~~~ Hello ~~~");
Ok(())
})
.map_err(|_| unimplemented!());
spawn(fut, &tx).wait().unwrap();
}
thread::sleep(Duration::from_millis(100));
}
View

Large diffs are not rendered by default.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -0,0 +1,331 @@
extern crate tokio_threadpool;
extern crate tokio_executor;
extern crate futures;
extern crate env_logger;
use tokio_threadpool::*;
use futures::{Poll, Sink, Stream, Async};
use futures::future::{Future, lazy};
use std::cell::Cell;
use std::sync::{mpsc, Arc};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
thread_local!(static FOO: Cell<u32> = Cell::new(0));
#[test]
fn natural_shutdown_simple_futures() {
let _ = ::env_logger::init();
for _ in 0..1_000 {
static NUM_INC: AtomicUsize = ATOMIC_USIZE_INIT;
static NUM_DEC: AtomicUsize = ATOMIC_USIZE_INIT;
FOO.with(|f| {
f.set(1);
let pool = Builder::new()
.around_worker(|w, _| {
NUM_INC.fetch_add(1, Relaxed);
w.run();
NUM_DEC.fetch_add(1, Relaxed);
})
.build();
let tx = pool.sender().clone();
let a = {
let (t, rx) = mpsc::channel();
tx.spawn(lazy(move || {
// Makes sure this runs on a worker thread
FOO.with(|f| assert_eq!(f.get(), 0));
t.send("one").unwrap();
Ok(())
})).unwrap();
rx
};
let b = {
let (t, rx) = mpsc::channel();
tx.spawn(lazy(move || {
// Makes sure this runs on a worker thread
FOO.with(|f| assert_eq!(f.get(), 0));
t.send("two").unwrap();
Ok(())
})).unwrap();
rx
};
drop(tx);
assert_eq!("one", a.recv().unwrap());
assert_eq!("two", b.recv().unwrap());
// Wait for the pool to shutdown
pool.shutdown().wait().unwrap();
// Assert that at least one thread started
let num_inc = NUM_INC.load(Relaxed);
assert!(num_inc > 0);
// Assert that all threads shutdown
let num_dec = NUM_DEC.load(Relaxed);
assert_eq!(num_inc, num_dec);
});
}
}
#[test]
fn force_shutdown_drops_futures() {
let _ = ::env_logger::init();
for _ in 0..1_000 {
let num_inc = Arc::new(AtomicUsize::new(0));
let num_dec = Arc::new(AtomicUsize::new(0));
let num_drop = Arc::new(AtomicUsize::new(0));
struct Never(Arc<AtomicUsize>);
impl Future for Never {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(Async::NotReady)
}
}
impl Drop for Never {
fn drop(&mut self) {
self.0.fetch_add(1, Relaxed);
}
}
let a = num_inc.clone();
let b = num_dec.clone();
let mut pool = Builder::new()
.around_worker(move |w, _| {
a.fetch_add(1, Relaxed);
w.run();
b.fetch_add(1, Relaxed);
})
.build();
let mut tx = pool.sender().clone();
tx.spawn(Never(num_drop.clone())).unwrap();
// Wait for the pool to shutdown
pool.shutdown_now().wait().unwrap();
// Assert that only a single thread was spawned.
let a = num_inc.load(Relaxed);
assert!(a >= 1);
// Assert that all threads shutdown
let b = num_dec.load(Relaxed);
assert_eq!(a, b);
// Assert that the future was dropped
let c = num_drop.load(Relaxed);
assert_eq!(c, 1);
}
}
#[test]
fn thread_shutdown_timeout() {
use std::sync::Mutex;
let _ = ::env_logger::init();
let (shutdown_tx, shutdown_rx) = mpsc::channel();
let (complete_tx, complete_rx) = mpsc::channel();
let t = Mutex::new(shutdown_tx);
let pool = Builder::new()
.keep_alive(Some(Duration::from_millis(200)))
.around_worker(move |w, _| {
w.run();
// There could be multiple threads here
let _ = t.lock().unwrap().send(());
})
.build();
let tx = pool.sender().clone();
let t = complete_tx.clone();
tx.spawn(lazy(move || {
t.send(()).unwrap();
Ok(())
})).unwrap();
// The future completes
complete_rx.recv().unwrap();
// The thread shuts down eventually
shutdown_rx.recv().unwrap();
// Futures can still be run
tx.spawn(lazy(move || {
complete_tx.send(()).unwrap();
Ok(())
})).unwrap();
complete_rx.recv().unwrap();
pool.shutdown().wait().unwrap();
}
#[test]
fn many_oneshot_futures() {
const NUM: usize = 10_000;
let _ = ::env_logger::init();
for _ in 0..50 {
let pool = ThreadPool::new();
let mut tx = pool.sender().clone();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
tx.spawn(lazy(move || {
cnt.fetch_add(1, Relaxed);
Ok(())
})).unwrap();
}
// Wait for the pool to shutdown
pool.shutdown().wait().unwrap();
let num = cnt.load(Relaxed);
assert_eq!(num, NUM);
}
}
#[test]
fn many_multishot_futures() {
use futures::sync::mpsc;
const CHAIN: usize = 200;
const CYCLES: usize = 5;
const TRACKS: usize = 50;
let _ = ::env_logger::init();
for _ in 0..50 {
let pool = ThreadPool::new();
let mut pool_tx = pool.sender().clone();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = mpsc::channel(10);
for _ in 0..CHAIN {
let (next_tx, next_rx) = mpsc::channel(10);
let rx = chain_rx
.map_err(|e| panic!("{:?}", e));
// Forward all the messages
pool_tx.spawn(next_tx
.send_all(rx)
.map(|_| ())
.map_err(|e| panic!("{:?}", e))
).unwrap();
chain_rx = next_rx;
}
// This final task cycles if needed
let (final_tx, final_rx) = mpsc::channel(10);
let cycle_tx = start_tx.clone();
let mut rem = CYCLES;
pool_tx.spawn(chain_rx.take(CYCLES as u64).for_each(move |msg| {
rem -= 1;
let send = if rem == 0 {
final_tx.clone().send(msg)
} else {
cycle_tx.clone().send(msg)
};
send.then(|res| {
res.unwrap();
Ok(())
})
})).unwrap();
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
for start_tx in start_txs {
start_tx.send("ping").wait().unwrap();
}
for final_rx in final_rxs {
final_rx.wait().next().unwrap().unwrap();
}
// Shutdown the pool
pool.shutdown().wait().unwrap();
}
}
#[test]
fn global_executor_is_configured() {
let pool = ThreadPool::new();
let tx = pool.sender().clone();
let (signal_tx, signal_rx) = mpsc::channel();
tx.spawn(lazy(move || {
tokio_executor::spawn(lazy(move || {
signal_tx.send(()).unwrap();
Ok(())
}));
Ok(())
})).unwrap();
signal_rx.recv().unwrap();
pool.shutdown().wait().unwrap();
}
#[test]
fn new_threadpool_is_idle() {
let pool = ThreadPool::new();
pool.shutdown_on_idle().wait().unwrap();
}
#[test]
fn busy_threadpool_is_not_idle() {
use futures::sync::oneshot;
let pool = ThreadPool::new();
let tx = pool.sender().clone();
let (term_tx, term_rx) = oneshot::channel();
tx.spawn(term_rx.then(|_| {
Ok(())
})).unwrap();
let mut idle = pool.shutdown_on_idle();
futures::lazy(|| {
assert!(idle.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
}).wait().unwrap();
term_tx.send(()).unwrap();
idle.wait().unwrap();
}