Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Remove rayon and some uses of tokio #7153

Merged
merged 16 commits into from May 16, 2023
26 changes: 18 additions & 8 deletions node/core/pvf/worker/src/common.rs
Expand Up @@ -18,15 +18,12 @@ use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
any::Any,
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
time::Duration,
};
use tokio::{
io,
net::UnixStream,
runtime::{Handle, Runtime},
};
use tokio::{io, net::UnixStream, runtime::Runtime};

/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
Expand All @@ -44,7 +41,7 @@ pub fn worker_event_loop<F, Fut>(
node_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(Handle, UnixStream) -> Fut,
F: FnMut(UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
let worker_pid = std::process::id();
Expand All @@ -68,13 +65,12 @@ pub fn worker_event_loop<F, Fut>(

// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need tokio and futures if everything async-related is already purged? Filesystem interactions are sync in nature, and for the worker reading from the socket is blocking too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Note that we still need to remove the dependencies on polkadot-node-core-pvf and tracing-gum to fully remove the dependency on tokio.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's tracing-gum related to tokio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran cargo tree -e normal in the crate and saw tokio several times in the output, e.g. under sc-network and libp2p crates. I have no idea how tracing-gum works though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's jaeger, even though gum only uses hashing from it.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Better to polish the rest of the code and properly synchronize threads and take care of removing tokio later (if it's possible)

.block_on(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;

let result = event_loop(handle.clone(), stream).await;
let result = event_loop(stream).await;

result
})
Expand Down Expand Up @@ -124,6 +120,20 @@ pub fn cpu_time_monitor_loop(
}
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
Expand Down
80 changes: 48 additions & 32 deletions node/core/pvf/worker/src/execute.rs
Expand Up @@ -15,20 +15,20 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
executor_intf::Executor,
common::{bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, worker_event_loop},
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE},
LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf::{
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
};
use polkadot_parachain::primitives::ValidationResult;
use std::{
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
sync::mpsc::channel,
thread,
time::Duration,
};
use tokio::{io, net::UnixStream};
Expand Down Expand Up @@ -72,13 +72,13 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
worker_event_loop("execute", socket_path, node_version, |mut stream| async move {
let worker_pid = std::process::id();

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
let executor = Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
})?;

loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
Expand All @@ -94,26 +94,37 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor.
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
.fuse();
let cpu_time_monitor_thread = thread::spawn(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
});
let executor_2 = executor.clone();
let execute_fut = rt_handle
.spawn_blocking(move || {
let execute_thread =
thread::Builder::new().stack_size(EXECUTE_THREAD_STACK_SIZE).spawn(move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
})
.fuse();

pin_mut!(cpu_time_monitor_fut);
pin_mut!(execute_fut);

let response = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
})?;

// "Select" the first thread that finishes by checking all threads in a naive loop.
let response = loop {
// NOTE: The order in which we poll threads is important! This loop sleeps between
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// polling, so it is possible to go over the timeout even when the worker thread
// finishes on time. So we want to prioritize selecting the worker thread and not
// the CPU thread. If the measured CPU time is over the timeout, we will reject the
// candidate later on the host-side. This avoids a source of indeterminism, where a
// job can trigger timeouts on some validators and not others.
if execute_thread.is_finished() {
let _ = finished_tx.send(());
break execute_thread.join().unwrap_or_else(|e| {
// TODO: Use `Panic` error once that is implemented.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have an issue for this , rather than TODO in the code ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already addressed it here, it's approved so I'll merge it right after this PR. (I had planned to do these changes right after another so I left the TODO as a marker for myself, did the change, and set 7155's merge target to this branch. Will do issues instead in the future. 👍)

Response::format_internal(
"execute thread error",
&stringify_panic_payload(e),
)
})
}
// If this thread is not selected, the join handle is dropped and the thread will
// finish in the background.
cpu_time_monitor_res = cpu_time_monitor_fut => {
match cpu_time_monitor_res {
if cpu_time_monitor_thread.is_finished() {
break match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
Expand All @@ -125,14 +136,19 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
Ok(None) => Response::InternalError(
"error communicating over finished channel".into(),
),
// We can use an internal error here because errors in this thread are
// independent of the candidate.
Err(e) => Response::format_internal(
"cpu time monitor thread error",
&stringify_panic_payload(e),
),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
},
}

thread::sleep(Duration::from_millis(10));
};

send_response(&mut stream, response).await?;
Expand All @@ -143,7 +159,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: Arc<Executor>,
executor: Executor,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
Expand Down
137 changes: 57 additions & 80 deletions node/core/pvf/worker/src/executor_intf.rs
Expand Up @@ -29,6 +29,42 @@ use std::{
path::Path,
};

// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
// that the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// will abort and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it. The simplest way to fix that is to spawn a thread with the desired
// stack limit.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
/// The stack size for the execute thread.
pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;

// Memory configuration
//
// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate
Expand Down Expand Up @@ -142,60 +178,17 @@ fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result<Semantics, Strin
Ok(sem)
}

#[derive(Clone)]
pub struct Executor {
thread_pool: rayon::ThreadPool,
config: Config,
}

impl Executor {
pub fn new(params: ExecutorParams) -> Result<Self, String> {
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
//
// Also, we configure the executor to provide the deterministic stack and that requires
// supplying the amount of the native stack space that wasm is allowed to use. This is
// realized by supplying the limit into `wasmtime::Config::max_wasm_stack`.
//
// There are quirks to that configuration knob:
//
// 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check
// that the stack space is actually available.
//
// That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes
// more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the
// guard page and the Rust stack overflow handler will be triggered. That leads to an
// **abort**.
//
// 2. It cannot and does not limit the stack space consumed by Rust code.
//
// Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code
// will abort and that will abort the process as well.
//
// Typically on Linux the main thread gets the stack size specified by the `ulimit` and
// typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the
// NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough.
//
// Hence we need to increase it.
//
// The simplest way to fix that is to spawn a thread with the desired stack limit. In order
// to avoid costs of creating a thread, we use a thread pool. The execution is
// single-threaded hence the thread pool has only one thread.
//
// The reasoning why we pick this particular size is:
//
// The default Rust thread stack limit 2 MiB + 256 MiB wasm stack.
bkchr marked this conversation as resolved.
Show resolved Hide resolved
let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize;
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.stack_size(thread_stack_size)
.build()
.map_err(|e| format!("Failed to create thread pool: {:?}", e))?;

let mut config = DEFAULT_CONFIG.clone();
config.semantics = params_to_wasmtime_semantics(&params)?;

Ok(Self { thread_pool, config })
Ok(Self { config })
}

/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
Expand All @@ -212,47 +205,31 @@ impl Executor {
///
/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution.
pub unsafe fn execute(
&self,
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
self,
compiled_artifact_path: &Path,
params: &[u8],
) -> Result<Vec<u8>, String> {
let mut result = None;
self.thread_pool.scope({
let result = &mut result;
move |s| {
s.spawn(move |_| {
// spawn does not return a value, so we need to use a variable to pass the result.
*result = Some(
do_execute(compiled_artifact_path, self.config.clone(), params)
.map_err(|err| format!("execute error: {:?}", err)),
);
});
}
});
result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string()))
let mut extensions = sp_externalities::Extensions::new();

extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));

let mut ext = ValidationExternalities(extensions);

match sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
compiled_artifact_path,
self.config,
)?;
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
}) {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(err)) => Err(err),
Err(err) => Err(err),
}
.map_err(|err| format!("execute error: {:?}", err))
}
}

unsafe fn do_execute(
compiled_artifact_path: &Path,
config: Config,
params: &[u8],
) -> Result<Vec<u8>, sc_executor_common::error::Error> {
let mut extensions = sp_externalities::Extensions::new();

extensions.register(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion));

let mut ext = ValidationExternalities(extensions);

sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::<HostFunctions>(
compiled_artifact_path,
config,
)?;
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
})?
}

type HostFunctions = (
sp_io::misc::HostFunctions,
sp_io::crypto::HostFunctions,
Expand Down