Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mpz-common): multi-threaded executor #136

Merged
merged 9 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions crates/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ mpz-share-conversion-core = { path = "mpz-share-conversion-core" }
clmul = { path = "clmul" }
matrix-transpose = { path = "matrix-transpose" }

tlsn-utils = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "8f2fc9e" }
tlsn-utils-aio = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "8f2fc9e" }
tlsn-utils = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "6e0be94" }
tlsn-utils-aio = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "6e0be94" }

# rand
rand_chacha = "0.3"
Expand Down Expand Up @@ -83,7 +83,10 @@ prost-build = "0.9"
bytes = "1"
yamux = "0.10"
bytemuck = { version = "1.13", features = ["derive"] }
serio = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "8f2fc9e" }
serio = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "6e0be94" }

# io
uid-mux = { git = "https://github.com/tlsnotary/tlsn-utils", rev = "6e0be94" }

# testing
prost = "0.9"
Expand Down
13 changes: 12 additions & 1 deletion crates/mpz-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
[features]
default = ["sync"]
sync = []
test-utils = []
test-utils = ["uid-mux/test-utils"]
ideal = []

[dependencies]
Expand All @@ -17,4 +17,15 @@ async-trait.workspace = true
scoped-futures.workspace = true
thiserror.workspace = true
serio.workspace = true
uid-mux.workspace = true
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
tokio = { workspace = true, features = [
"io-util",
"macros",
"rt-multi-thread",
] }
tokio-util = { workspace = true, features = ["compat"] }
uid-mux = { workspace = true, features = ["test-utils"] }
tracing-subscriber = { workspace = true, features = ["fmt"] }
85 changes: 61 additions & 24 deletions crates/mpz-common/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,68 @@
use core::fmt;

use async_trait::async_trait;

use scoped_futures::ScopedBoxFuture;
use serio::{IoSink, IoStream};

use crate::ThreadId;

/// An error for types that implement [`Context`].
#[derive(Debug, thiserror::Error)]
#[error("context error: {kind}")]
pub struct ContextError {
kind: ErrorKind,
#[source]
source: Option<Box<dyn std::error::Error + Send + Sync>>,
}

impl ContextError {
pub(crate) fn new<E: Into<Box<dyn std::error::Error + Send + Sync>>>(
kind: ErrorKind,
source: E,
) -> Self {
Self {
kind,
source: Some(source.into()),
}
}
}

#[derive(Debug)]
pub(crate) enum ErrorKind {
Mux,
Thread,
}

impl fmt::Display for ErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ErrorKind::Mux => write!(f, "multiplexer error"),
ErrorKind::Thread => write!(f, "thread error"),
}
}
}

/// A thread context.
#[async_trait]
pub trait Context: Send {
/// The type of I/O channel used by the thread.
pub trait Context: Send + Sync {
/// I/O channel used by the thread.
type Io: IoSink + IoStream + Send + Unpin + 'static;

/// Returns the thread ID.
fn id(&self) -> &ThreadId;

/// Returns the maximum available concurrency.
fn max_concurrency(&self) -> usize;

/// Returns a mutable reference to the thread's I/O channel.
fn io_mut(&mut self) -> &mut Self::Io;

/// Forks the thread and executes the provided closures concurrently.
///
/// Implementations may not be able to fork, in which case the closures are executed
/// sequentially.
async fn join<'a, A, B, RA, RB>(&'a mut self, a: A, b: B) -> (RA, RB)
async fn join<'a, A, B, RA, RB>(&'a mut self, a: A, b: B) -> Result<(RA, RB), ContextError>
where
A: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, RA> + Send + 'a,
B: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, RB> + Send + 'a,
Expand All @@ -36,7 +77,11 @@ pub trait Context: Send {
///
/// Implementations may not be able to fork, in which case the closures are executed
/// sequentially.
async fn try_join<'a, A, B, RA, RB, E>(&'a mut self, a: A, b: B) -> Result<(RA, RB), E>
async fn try_join<'a, A, B, RA, RB, E>(
&'a mut self,
a: A,
b: B,
) -> Result<Result<(RA, RB), E>, ContextError>
where
A: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, Result<RA, E>> + Send + 'a,
B: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, Result<RB, E>> + Send + 'a,
Expand All @@ -50,17 +95,12 @@ pub trait Context: Send {
/// This macro calls `Context::join` under the hood.
#[macro_export]
macro_rules! join {
($ctx:ident, $task_0:expr, $task_1:expr) => {
async {
use $crate::{scoped_futures::ScopedFutureExt, Context};
$ctx.join(
|$ctx| async { $task_0.await }.scope_boxed(),
|$ctx| async { $task_1.await }.scope_boxed(),
)
($ctx:ident, $task_0:expr, $task_1:expr) => {{
#[allow(unused_imports)]
use $crate::{scoped_futures::ScopedFutureExt, Context};
$ctx.join(|$ctx| $task_0.scope_boxed(), |$ctx| $task_1.scope_boxed())
.await
}
.await
};
}};
}

/// A convenience macro for forking a context and joining two tasks concurrently, returning an error
Expand All @@ -69,17 +109,12 @@ macro_rules! join {
/// This macro calls `Context::try_join` under the hood.
#[macro_export]
macro_rules! try_join {
($ctx:ident, $task_0:expr, $task_1:expr) => {
async {
use $crate::{scoped_futures::ScopedFutureExt, Context};
$ctx.try_join(
|$ctx| async { $task_0.await }.scope_boxed(),
|$ctx| async { $task_1.await }.scope_boxed(),
)
($ctx:ident, $task_0:expr, $task_1:expr) => {{
#[allow(unused_imports)]
use $crate::{scoped_futures::ScopedFutureExt, Context};
$ctx.try_join(|$ctx| $task_0.scope_boxed(), |$ctx| $task_1.scope_boxed())
.await
}
.await
};
}};
}

#[cfg(test)]
Expand All @@ -94,6 +129,7 @@ mod tests {
join!(ctx, async { println!("{:?}", ctx.id()) }, async {
println!("{:?}", ctx.id())
})
.unwrap()
});
}

Expand All @@ -107,6 +143,7 @@ mod tests {
async { Ok::<_, ()>(println!("{:?}", ctx.id())) },
async { Ok::<_, ()>(println!("{:?}", ctx.id())) }
)
.unwrap()
.unwrap();
});
}
Expand Down
30 changes: 21 additions & 9 deletions crates/mpz-common/src/executor/dummy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use async_trait::async_trait;

use scoped_futures::ScopedBoxFuture;
use serio::{Sink, Stream};

use crate::{context::Context, ThreadId};
use crate::{context::Context, ContextError, ThreadId};

/// A dummy executor.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -67,11 +66,15 @@ impl Context for DummyExecutor {
&self.id
}

fn max_concurrency(&self) -> usize {
1
}

fn io_mut(&mut self) -> &mut Self::Io {
&mut self.io
}

async fn join<'a, A, B, RA, RB>(&'a mut self, a: A, b: B) -> (RA, RB)
async fn join<'a, A, B, RA, RB>(&'a mut self, a: A, b: B) -> Result<(RA, RB), ContextError>
where
A: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, RA> + Send + 'a,
B: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, RB> + Send + 'a,
Expand All @@ -80,20 +83,28 @@ impl Context for DummyExecutor {
{
let a = a(self).await;
let b = b(self).await;
(a, b)
Ok((a, b))
}

async fn try_join<'a, A, B, RA, RB, E>(&'a mut self, a: A, b: B) -> Result<(RA, RB), E>
async fn try_join<'a, A, B, RA, RB, E>(
&'a mut self,
a: A,
b: B,
) -> Result<Result<(RA, RB), E>, ContextError>
where
A: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, Result<RA, E>> + Send + 'a,
B: for<'b> FnOnce(&'b mut Self) -> ScopedBoxFuture<'a, 'b, Result<RB, E>> + Send + 'a,
RA: Send + 'a,
RB: Send + 'a,
E: Send + 'a,
{
let a = a(self).await?;
let b = b(self).await?;
Ok((a, b))
let try_join = |a: A, b: B| async move {
let a = a(self).await?;
let b = b(self).await?;
Ok((a, b))
};

Ok(try_join(a, b).await)
}
}

Expand Down Expand Up @@ -130,7 +141,8 @@ mod tests {
.scope_boxed()
},
)
.await;
.await
.unwrap();

// Make sure we can mutate the fields after borrowing them in the async closures.
self.a = ThreadId::default();
Expand Down
20 changes: 20 additions & 0 deletions crates/mpz-common/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! Executors.

mod dummy;
mod mt;
mod st;

pub use dummy::{DummyExecutor, DummyIo};
pub use mt::MTExecutor;
pub use st::STExecutor;

#[cfg(any(test, feature = "test-utils"))]
mod test_utils {
use serio::channel::{duplex, MemoryDuplex};
use uid_mux::test_utils::{test_framed_mux, TestFramedMux};

use super::*;

Expand All @@ -20,6 +23,23 @@ mod test_utils {

(STExecutor::new(io_0), STExecutor::new(io_1))
}

/// Test multi-threaded executor.
pub type TestMTExecutor = MTExecutor<TestFramedMux>;

/// Creates a pair of multi-threaded executors with yamux I/O channels.
///
/// # Arguments
///
/// * `io_buffer` - The size of the I/O buffer (channel capacity).
pub fn test_mt_executor(io_buffer: usize) -> (TestMTExecutor, TestMTExecutor) {
let (mux_0, mux_1) = test_framed_mux(io_buffer);

let exec_0 = MTExecutor::new(mux_0, 8);
let exec_1 = MTExecutor::new(mux_1, 8);

(exec_0, exec_1)
}
}

#[cfg(any(test, feature = "test-utils"))]
Expand Down
Loading