Skip to content

Commit

Permalink
Current thread runtime (#308)
Browse files Browse the repository at this point in the history
This patch introduces a version of `Runtime` that runs all components on
the current thread. This allows users to spawn futures that do not implement
`Send`.
  • Loading branch information
kpp authored and carllerche committed May 2, 2018
1 parent 6a0ecef commit 2465483
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Builder {
/// # extern crate tokio;
/// # use tokio::runtime::Builder;
/// # pub fn main() {
/// let runtime = Builder::new().build();
/// let runtime = Builder::new().build().unwrap();
/// // ... call runtime.run(...)
/// # let _ = runtime;
/// # }
Expand Down
72 changes: 72 additions & 0 deletions src/runtime/current_thread/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! A runtime implementation that runs everything on the current thread.
//!
//! [`current_thread::Runtime`][rt] is similar to the primary
//! [`Runtime`][concurrent-rt] except that it runs all components on the current
//! thread instead of using a thread pool. This means that it is able to spawn
//! futures that do not implement `Send`.
//!
//! Same as the default [`Runtime`][concurrent-rt], the
//! [`current_thread::Runtime`][rt] includes:
//!
//! * A [reactor] to drive I/O resources.
//! * An [executor] to execute tasks that use these I/O resources.
//! * A [timer] for scheduling work to run after a set period of time.
//!
//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself
//! and cannot be safely moved to other threads.
//!
//! # Spawning from other threads
//!
//! By default, [`current_thread::Runtime`][rt] does not provide a way to spawn
//! tasks from other threads. However, this can be accomplished by using a
//! [`mpsc::channel`][chan]. To do so, create a channel to send the task, then
//! spawn a task on [`current_thread::Runtime`][rt] that consumes the channel
//! messages and spawns new tasks for them.
//!
//! For example:
//!
//! ```
//! # extern crate tokio;
//! # extern crate futures;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! use futures::sync::mpsc;
//!
//! # fn main() {
//! let mut runtime = Runtime::new().unwrap();
//! let (tx, rx) = mpsc::channel(128);
//! # tx.send(future::ok(()));
//!
//! runtime.spawn(rx.for_each(|task| {
//! tokio::spawn(task);
//! Ok(())
//! }).map_err(|e| panic!("channel error")));
//!
//! # /*
//! runtime.run().unwrap();
//! # */
//! # }
//! ```
//!
//! # Examples
//!
//! Creating a new `Runtime` and running a future `f` until its completion and
//! returning its result.
//!
//! ```
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//!
//! let mut runtime = Runtime::new().unwrap();
//!
//! // Use the runtime...
//! // runtime.block_on(f); // where f is a future
//! ```
//!
//! [rt]: struct.Runtime.html
//! [concurrent-rt]: ../struct.Runtime.html
//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html

mod runtime;

pub use self::runtime::Runtime;
149 changes: 149 additions & 0 deletions src/runtime/current_thread/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use executor::current_thread::{self, CurrentThread};

use tokio_reactor::{self, Reactor};
use tokio_timer::timer::{self, Timer};
use tokio_executor;

use futures::Future;

use std::io;

/// Single-threaded runtime provides a way to start reactor
/// and executor on the current thread.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
#[derive(Debug)]
pub struct Runtime {
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
executor: CurrentThread<Timer<Reactor>>,
}

/// Error returned by the `run` function.
#[derive(Debug)]
pub struct RunError {
inner: current_thread::RunError,
}

impl Runtime {
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();

// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new(reactor);
let timer_handle = timer.handle();

// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);

let runtime = Runtime { reactor_handle, timer_handle, executor };
Ok(runtime)
}

/// Spawn a future onto the single-threaded Tokio runtime.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust
/// # extern crate tokio;
/// # extern crate futures;
/// # use futures::{future, Future, Stream};
/// use tokio::runtime::current_thread::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| {
/// println!("running on the runtime");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where F: Future<Item = (), Error = ()> + 'static,
{
self.executor.spawn(future);
self
}

/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future
{
self.enter(|executor| {
// Run the provided future
let ret = executor.block_on(f);
ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
})
}

/// Run the executor to completion, blocking the thread until **all**
/// spawned futures have completed.
pub fn run(&mut self) -> Result<(), RunError> {
self.enter(|executor| executor.run())
.map_err(|e| RunError {
inner: e,
})
}

fn enter<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R
{
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor } = *self;

// Binds an executor to this thread
let mut enter = tokio_executor::enter().expect("Multiple executors at once");

// This will set the default handle and timer to use inside the closure
// and run the future.
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
})
})
})
}
}
1 change: 1 addition & 0 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
//! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html

mod builder;
pub mod current_thread;
mod shutdown;
mod task_executor;

Expand Down
78 changes: 51 additions & 27 deletions tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,60 @@ macro_rules! t {
})
}

fn create_client_server_future() -> Box<Future<Item=(), Error=()> + Send> {
let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(server.local_addr());
let client = TcpStream::connect(&addr);

let server = server.incoming().take(1)
.map_err(|e| panic!("accept err = {:?}", e))
.for_each(|socket| {
tokio::spawn({
io::write_all(socket, b"hello")
.map(|_| ())
.map_err(|e| panic!("write err = {:?}", e))
})
})
.map(|_| ());

let client = client
.map_err(|e| panic!("connect err = {:?}", e))
.and_then(|client| {
// Read all
io::read_to_end(client, vec![])
.map(|_| ())
.map_err(|e| panic!("read err = {:?}", e))
});

let future = server.join(client)
.map(|_| ());
Box::new(future)
}

#[test]
fn basic_runtime_usage() {
fn runtime_tokio_run() {
let _ = env_logger::init();

tokio::run({
let server = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(server.local_addr());
let client = TcpStream::connect(&addr);

let server = server.incoming().take(1)
.map_err(|e| panic!("accept err = {:?}", e))
.for_each(|socket| {
tokio::spawn({
io::write_all(socket, b"hello")
.map(|_| ())
.map_err(|e| panic!("write err = {:?}", e))
})
})
.map(|_| ());
tokio::run(create_client_server_future());
}

let client = client
.map_err(|e| panic!("connect err = {:?}", e))
.and_then(|client| {
// Read all
io::read_to_end(client, vec![])
.map(|_| ())
.map_err(|e| panic!("read err = {:?}", e))
});
#[test]
fn runtime_single_threaded() {
let _ = env_logger::init();

let mut runtime = tokio::runtime::current_thread::Runtime::new()
.unwrap();
runtime.block_on(create_client_server_future()).unwrap();
runtime.run().unwrap();
}

#[test]
fn runtime_multi_threaded() {
let _ = env_logger::init();

server.join(client)
.map(|_| ())
});
let mut runtime = tokio::runtime::Builder::new()
.build()
.unwrap();
runtime.spawn(create_client_server_future());
runtime.shutdown_on_idle().wait().unwrap();
}

0 comments on commit 2465483

Please sign in to comment.