Skip to content

Commit

Permalink
feat(mpz-common): Context::blocking (#141)
Browse files Browse the repository at this point in the history
* feat(mpz-common): Context::blocking

* Apply suggestions from code review

Co-authored-by: dan <themighty1@users.noreply.github.com>

---------

Co-authored-by: dan <themighty1@users.noreply.github.com>
  • Loading branch information
sinui0 and themighty1 committed May 31, 2024
1 parent 8f0b298 commit 9b51bd4
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 50 deletions.
1 change: 1 addition & 0 deletions crates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ futures-util = "0.3"
tokio = "1.23"
tokio-util = "0.7"
scoped-futures = "0.1.3"
pollster = "0.3"

# serialization
ark-serialize = "0.4"
Expand Down
11 changes: 11 additions & 0 deletions crates/mpz-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ default = ["sync"]
sync = []
test-utils = ["uid-mux/test-utils"]
ideal = []
rayon = ["dep:rayon"]
force-st = []

[dependencies]
mpz-core.workspace = true
Expand All @@ -19,6 +21,9 @@ thiserror.workspace = true
serio.workspace = true
uid-mux.workspace = true
serde = { workspace = true, features = ["derive"] }
pollster.workspace = true
rayon = { workspace = true, optional = true }
cfg-if.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = [
Expand All @@ -29,3 +34,9 @@ tokio = { workspace = true, features = [
tokio-util = { workspace = true, features = ["compat"] }
uid-mux = { workspace = true, features = ["test-utils"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }
criterion.workspace = true

[[bench]]
name = "context"
harness = false
required-features = ["test-utils", "rayon"]
53 changes: 53 additions & 0 deletions crates/mpz-common/benches/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use mpz_common::{
executor::{test_mt_executor, test_st_executor},
Context,
};
use pollster::block_on;
use scoped_futures::ScopedFutureExt;

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("context");

// Measures the overhead of making a `Context::blocking` call, which
// moves the context to a worker thread and back.
group.bench_function("st/blocking", |b| {
let (mut ctx, _) = test_st_executor(1024);
b.iter(|| {
block_on(async {
ctx.blocking(|ctx| {
async move {
black_box(ctx.id());
}
.scope_boxed()
})
.await
.unwrap();
});
})
});

// Measures the overhead of making a `Context::blocking` call, which
// moves the context to a worker thread and back.
group.bench_function("mt/blocking", |b| {
let (mut exec_a, _) = test_mt_executor(8);

let mut ctx = block_on(exec_a.new_thread()).unwrap();

b.iter(|| {
block_on(async {
ctx.blocking(|ctx| {
async move {
black_box(ctx.id());
}
.scope_boxed()
})
.await
.unwrap();
});
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
48 changes: 35 additions & 13 deletions crates/mpz-common/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ pub trait Context: Send + Sync {
/// Returns a mutable reference to the thread's I/O channel.
fn io_mut(&mut self) -> &mut Self::Io;

/// Executes a task that may block the thread.
///
/// If CPU multi-threading is available, the task is executed on a separate thread. Otherwise,
/// the task is executed on the current thread and can block the executor.
///
/// # Deadlocks
///
/// This method may cause deadlocks if the task blocks and the executor can not make progress.
/// Generally, one should *never* block across an await point. This method is intended for operations
/// that are CPU-bound but require access to a thread context.
///
/// # Overhead
///
/// This method has an inherent overhead and should only be used for tasks that are CPU-bound. Otherwise,
/// prefer using [`Context::queue`] or [`Context::join`] to execute tasks concurrently.
async fn blocking<F, R>(&mut self, f: F) -> Result<R, ContextError>
where
F: for<'a> FnOnce(&'a mut Self) -> ScopedBoxFuture<'static, 'a, R> + Send + 'static,
R: Send + 'static;

/// Forks the thread and executes the provided closures concurrently.
///
/// Implementations may not be able to fork, in which case the closures are executed
Expand Down Expand Up @@ -119,32 +139,34 @@ macro_rules! try_join {

#[cfg(test)]
mod tests {
use crate::executor::test_st_executor;
use crate::{executor::test_st_executor, Context};
use futures::executor::block_on;

#[test]
fn test_join_macro() {
let (mut ctx, _) = test_st_executor(1);

futures::executor::block_on(async {
join!(ctx, async { println!("{:?}", ctx.id()) }, async {
println!("{:?}", ctx.id())
})
.unwrap()
let (id_0, id_1) = block_on(async {
join!(ctx, async { ctx.id().clone() }, async { ctx.id().clone() }).unwrap()
});

assert_eq!(&id_0, ctx.id());
assert_eq!(&id_1, ctx.id());
}

#[test]
fn test_try_join_macro() {
let (mut ctx, _) = test_st_executor(1);

futures::executor::block_on(async {
try_join!(
ctx,
async { Ok::<_, ()>(println!("{:?}", ctx.id())) },
async { Ok::<_, ()>(println!("{:?}", ctx.id())) }
)
let (id_0, id_1) = block_on(async {
try_join!(ctx, async { Ok::<_, ()>(ctx.id().clone()) }, async {
Ok::<_, ()>(ctx.id().clone())
})
.unwrap()
.unwrap()
.unwrap();
});

assert_eq!(&id_0, ctx.id());
assert_eq!(&id_1, ctx.id());
}
}
122 changes: 122 additions & 0 deletions crates/mpz-common/src/cpu.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! CPU backend shim.

use cfg_if::cfg_if;

cfg_if! {
if #[cfg(feature = "force-st")] {
pub use st::SingleThreadedBackend as CpuBackend;
} else if #[cfg(feature = "rayon")] {
pub use rayon_backend::RayonBackend as CpuBackend;
} else {
pub use st::SingleThreadedBackend as CpuBackend;
}
}

#[cfg(any(feature = "force-st", not(feature = "rayon")))]
mod st {
use futures::Future;

/// A single-threaded CPU backend.
#[derive(Debug)]
pub struct SingleThreadedBackend;

impl SingleThreadedBackend {
/// Executes a future on the CPU backend.
#[inline]
pub fn blocking_async<F>(fut: F) -> impl Future<Output = F::Output> + Send
where
F: Future + Send + 'static,
F::Output: Send,
{
fut
}

/// Executes a closure on the CPU backend.
#[inline]
pub fn blocking<F, R>(f: F) -> impl Future<Output = R> + Send
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
async move { f() }
}
}

#[cfg(test)]
mod tests {
use super::*;
use pollster::block_on;

#[test]
fn test_st_backend_blocking() {
let output = block_on(SingleThreadedBackend::blocking(|| 42));
assert_eq!(output, 42);
}

#[test]
fn test_st_backend_blocking_async() {
let output = block_on(SingleThreadedBackend::blocking_async(async { 42 }));
assert_eq!(output, 42);
}
}
}

#[cfg(all(feature = "rayon", not(feature = "force-st")))]
mod rayon_backend {
use futures::{channel::oneshot, Future};
use pollster::block_on;

/// A Rayon CPU backend.
#[derive(Debug)]
pub struct RayonBackend;

impl RayonBackend {
/// Executes a future on the CPU backend.
pub fn blocking_async<F>(fut: F) -> impl Future<Output = F::Output> + Send
where
F: Future + Send + 'static,
F::Output: Send,
{
async move {
let (sender, receiver) = oneshot::channel();
rayon::spawn(move || {
let output = block_on(fut);
_ = sender.send(output);
});
receiver.await.expect("worker thread does not drop channel")
}
}

/// Executes a closure on the CPU backend.
pub fn blocking<F, R>(f: F) -> impl Future<Output = R> + Send
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
async move {
let (sender, receiver) = oneshot::channel();
rayon::spawn(move || {
_ = sender.send(f());
});
receiver.await.expect("worker thread does not drop channel")
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_rayon_backend_blocking() {
let output = block_on(RayonBackend::blocking(|| 42));
assert_eq!(output, 42);
}

#[test]
fn test_rayon_backend_blocking_async() {
let output = block_on(RayonBackend::blocking_async(async { 42 }));
assert_eq!(output, 42);
}
}
}
16 changes: 15 additions & 1 deletion crates/mpz-common/src/executor/dummy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use async_trait::async_trait;

use scoped_futures::ScopedBoxFuture;
use serio::{Sink, Stream};

use crate::{context::Context, ContextError, ThreadId};
use crate::{context::Context, cpu::CpuBackend, ContextError, ThreadId};

/// A dummy executor.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -74,6 +75,19 @@ impl Context for DummyExecutor {
&mut self.io
}

async fn blocking<F, R>(&mut self, f: F) -> Result<R, ContextError>
where
F: for<'a> FnOnce(&'a mut Self) -> ScopedBoxFuture<'static, 'a, R> + Send + 'static,
R: Send + 'static,
{
let mut ctx = Self {
id: self.id.clone(),
io: DummyIo,
};

Ok(CpuBackend::blocking_async(async move { f(&mut ctx).await }).await)
}

async fn join<'a, A, B, RA, RB>(&'a mut self, a: A, b: B) -> Result<(RA, RB), ContextError>
where
A: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, RA> + Send + 'a,
Expand Down
2 changes: 1 addition & 1 deletion crates/mpz-common/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod mt;
mod st;

pub use dummy::{DummyExecutor, DummyIo};
pub use mt::MTExecutor;
pub use mt::{MTContext, MTExecutor};
pub use st::STExecutor;

#[cfg(any(test, feature = "test-utils"))]
Expand Down
Loading

0 comments on commit 9b51bd4

Please sign in to comment.