Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Remove rayon and some uses of tokio #7153

Merged
merged 16 commits into from May 16, 2023
46 changes: 37 additions & 9 deletions node/core/pvf/worker/src/common.rs
Expand Up @@ -18,15 +18,15 @@ use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::never::Never;
use std::{
any::Any,
path::PathBuf,
sync::mpsc::{Receiver, RecvTimeoutError},
sync::{
mpsc::{Receiver, RecvTimeoutError},
Arc, Condvar, Mutex,
},
time::Duration,
};
use tokio::{
io,
net::UnixStream,
runtime::{Handle, Runtime},
};
use tokio::{io, net::UnixStream, runtime::Runtime};

/// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the
/// child process.
Expand All @@ -44,7 +44,7 @@ pub fn worker_event_loop<F, Fut>(
node_version: Option<&str>,
mut event_loop: F,
) where
F: FnMut(Handle, UnixStream) -> Fut,
F: FnMut(UnixStream) -> Fut,
Fut: futures::Future<Output = io::Result<Never>>,
{
let worker_pid = std::process::id();
Expand All @@ -68,13 +68,12 @@ pub fn worker_event_loop<F, Fut>(

// Run the main worker loop.
let rt = Runtime::new().expect("Creates tokio runtime. If this panics the worker will die and the host will detect that and deal with it.");
let handle = rt.handle();
let err = rt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need tokio and futures if everything async-related is already purged? Filesystem interactions are sync in nature, and for the worker reading from the socket is blocking too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Note that we still need to remove the dependencies on polkadot-node-core-pvf and tracing-gum to fully remove the dependency on tokio.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's tracing-gum related to tokio?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just ran cargo tree -e normal in the crate and saw tokio several times in the output, e.g. under sc-network and libp2p crates. I have no idea how tracing-gum works though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's jaeger, even though gum only uses hashing from it.

You're right! I just didn't remove the rest of async to keep this PR focused. But I can do it here if you want.

Better to polish the rest of the code and properly synchronize threads and take care of removing tokio later (if it's possible)

.block_on(async move {
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;

let result = event_loop(handle.clone(), stream).await;
let result = event_loop(stream).await;

result
})
Expand Down Expand Up @@ -124,6 +123,35 @@ pub fn cpu_time_monitor_loop(
}
}

/// Attempt to convert an opaque panic payload to a string.
///
/// This is a best effort, and is not guaranteed to provide the most accurate value.
pub fn stringify_panic_payload(payload: Box<dyn Any + Send + 'static>) -> String {
match payload.downcast::<&'static str>() {
Ok(msg) => msg.to_string(),
Err(payload) => match payload.downcast::<String>() {
Ok(msg) => *msg,
// At least we tried...
Err(_) => "unknown panic payload".to_string(),
},
}
}

/// Helper function to notify the thread waiting on this condvar. This follows the conventions in
/// the std docs, including the use of a `pending` flag.
pub fn cond_notify_all(cond: Arc<(Mutex<bool>, Condvar)>) {
let (lock, cvar) = &*cond;
let mut pending = lock.lock().unwrap();
*pending = false;
cvar.notify_all();
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
}

/// Helper function to block the thread while it waits on the condvar.
pub fn cond_wait_while(cond: Arc<(Mutex<bool>, Condvar)>) {
let (lock, cvar) = &*cond;
let _guard = cvar.wait_while(lock.lock().unwrap(), |pending| *pending).unwrap();
}

/// In case of node and worker version mismatch (as a result of in-place upgrade), send `SIGTERM`
/// to the node to tear it down and prevent it from raising disputes on valid candidates. Node
/// restart should be handled by the node owner. As node exits, unix sockets opened to workers
Expand Down
122 changes: 80 additions & 42 deletions node/core/pvf/worker/src/execute.rs
Expand Up @@ -15,20 +15,23 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
common::{bytes_to_path, cpu_time_monitor_loop, worker_event_loop},
executor_intf::Executor,
common::{
bytes_to_path, cond_notify_all, cond_wait_while, cpu_time_monitor_loop,
stringify_panic_payload, worker_event_loop,
},
executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE},
LOG_TARGET,
};
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf::{
framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response,
};
use polkadot_parachain::primitives::ValidationResult;
use std::{
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
sync::{mpsc::channel, Arc, Condvar, Mutex},
thread,
time::Duration,
};
use tokio::{io, net::UnixStream};
Expand Down Expand Up @@ -67,18 +70,22 @@ async fn send_response(stream: &mut UnixStream, response: Response) -> io::Resul
framed_send(stream, &response.encode()).await
}

/// The entrypoint that the spawned execute worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host. The `node_version`, if `Some`,
/// is checked against the worker version. A mismatch results in immediate worker termination.
/// `None` is used for tests and in other situations when version check is not necessary.
/// The entrypoint that the spawned execute worker should start with.
///
/// # Parameters
///
/// The `socket_path` specifies the path to the socket used to communicate with the host. The
/// `node_version`, if `Some`, is checked against the worker version. A mismatch results in
/// immediate worker termination. `None` is used for tests and in other situations when version
/// check is not necessary.
pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
worker_event_loop("execute", socket_path, node_version, |rt_handle, mut stream| async move {
worker_event_loop("execute", socket_path, node_version, |mut stream| async move {
let worker_pid = std::process::id();

let handshake = recv_handshake(&mut stream).await?;
let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
let executor = Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
})?;

loop {
let (artifact_path, params, execution_timeout) = recv_request(&mut stream).await?;
Expand All @@ -89,31 +96,57 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
artifact_path.display(),
);

// Used to signal to the cpu time monitor thread that it can finish.
let (finished_tx, finished_rx) = channel::<()>();
// Conditional variable to notify us when a thread is done.
let cond_main = Arc::new((Mutex::new(true), Condvar::new()));
let cond_cpu = Arc::clone(&cond_main);
let cond_job = Arc::clone(&cond_main);

let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor.
let cpu_time_monitor_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
.fuse();
let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>();
let cpu_time_monitor_thread = thread::spawn(move || {
let result =
cpu_time_monitor_loop(cpu_time_start, execution_timeout, cpu_time_monitor_rx);
cond_notify_all(cond_cpu);
result
});
let executor_2 = executor.clone();
let execute_fut = rt_handle
.spawn_blocking(move || {
validate_using_artifact(&artifact_path, &params, executor_2, cpu_time_start)
})
.fuse();

pin_mut!(cpu_time_monitor_fut);
pin_mut!(execute_fut);

let response = select_biased! {
// If this future is not selected, the join handle is dropped and the thread will
// finish in the background.
cpu_time_monitor_res = cpu_time_monitor_fut => {
match cpu_time_monitor_res {
let execute_thread =
thread::Builder::new().stack_size(EXECUTE_THREAD_STACK_SIZE).spawn(move || {
let result = validate_using_artifact(
&artifact_path,
&params,
executor_2,
cpu_time_start,
);
cond_notify_all(cond_job);
result
})?;

// Wait for one of the threads to finish.
cond_wait_while(cond_main);

// A thread has signaled completion but it may not be "joinable" yet, so loop for a bit.
let response = loop {
// NOTE: We check the worker thread first to give it priority. This is for the rare
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// case where it barely finishes in time, so we don't discard its result. On the
// other hand, if its measured CPU time is over the timeout, we will reject the
// candidate later on the host-side.
if execute_thread.is_finished() {
let _ = cpu_time_monitor_tx.send(());
break execute_thread.join().unwrap_or_else(|e| {
// TODO: Use `Panic` error once that is implemented.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have an issue for this , rather than TODO in the code ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already addressed it here, it's approved so I'll merge it right after this PR. (I had planned to do these changes right after another so I left the TODO as a marker for myself, did the change, and set 7155's merge target to this branch. Will do issues instead in the future. 👍)

Response::format_internal(
"execute thread error",
&stringify_panic_payload(e),
)
})
}
// If this thread is not selected, we signal it to end, the join handle is dropped
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// and the thread will finish in the background.
if cpu_time_monitor_thread.is_finished() {
break match cpu_time_monitor_thread.join() {
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
Expand All @@ -125,14 +158,17 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::format_internal("cpu time monitor thread error", &e.to_string()),
Ok(None) => Response::InternalError(
"error communicating over finished channel".into(),
),
// We can use an internal error here because errors in this thread are
// independent of the candidate.
Err(e) => Response::format_internal(
"cpu time monitor thread error",
&stringify_panic_payload(e),
),
}
},
execute_res = execute_fut => {
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::format_internal("execute thread error", &e.to_string()))
},
}
};

send_response(&mut stream, response).await?;
Expand All @@ -143,7 +179,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {
fn validate_using_artifact(
artifact_path: &Path,
params: &[u8],
executor: Arc<Executor>,
executor: Executor,
cpu_time_start: ProcessTime,
) -> Response {
// Check here if the file exists, because the error from Substrate is not match-able.
Expand All @@ -163,13 +199,15 @@ fn validate_using_artifact(
Ok(d) => d,
};

let duration = cpu_time_start.elapsed();

let result_descriptor = match ValidationResult::decode(&mut &descriptor_bytes[..]) {
Err(err) =>
return Response::format_invalid("validation result decoding failed", &err.to_string()),
Ok(r) => r,
};

// Include the decoding in the measured time, to prevent any potential attacks exploiting some
// bug in decoding.
let duration = cpu_time_start.elapsed();

Response::Ok { result_descriptor, duration }
}