Skip to content

Commit

Permalink
chore(driver-adapters): partial revert of #4499
Browse files Browse the repository at this point in the history
  • Loading branch information
jkomyno committed Nov 30, 2023
1 parent 48b9381 commit 0cd2541
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod external_process;
use super::*;
use external_process::*;
use serde::de::DeserializeOwned;
use std::sync::atomic::AtomicU64;
use std::{collections::HashMap, sync::atomic::AtomicU64};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

pub(crate) async fn executor_process_request<T: DeserializeOwned>(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
use super::*;
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use std::{
error::Error as StdError,
fmt::Display,
io::Write as _,
sync::{atomic::Ordering, Arc},
};
use tokio::sync::{mpsc, oneshot, RwLock};
use std::{fmt::Display, io::Write as _, sync::atomic::Ordering};
use tokio::sync::{mpsc, oneshot};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

Expand All @@ -34,17 +29,6 @@ fn exit_with_message(status_code: i32, message: &str) -> ! {
}

impl ExecutorProcess {
fn spawn() -> ExecutorProcess {
match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
}
}
}

fn new() -> Result<ExecutorProcess> {
let (sender, receiver) = mpsc::channel::<ReqImpl>(300);

Expand Down Expand Up @@ -97,50 +81,15 @@ impl ExecutorProcess {
}
}

/// Wraps an ExecutorProcess allowing for restarting it.
///
/// A node process can die for a number of reasons, being one that any `panic!` occurring in Rust
/// asynchronous code are translated to an abort trap by wasm-bindgen, which kills the node process.
#[derive(Clone)]
pub(crate) struct RestartableExecutorProcess {
process: Arc<RwLock<ExecutorProcess>>,
}

impl RestartableExecutorProcess {
fn new() -> Self {
Self {
process: Arc::new(RwLock::new(ExecutorProcess::spawn())),
pub(super) static EXTERNAL_PROCESS: Lazy<ExecutorProcess> =
Lazy::new(|| match std::thread::spawn(ExecutorProcess::new).join() {
Ok(Ok(process)) => process,
Ok(Err(err)) => exit_with_message(1, &format!("Failed to start node process. Details: {err}")),
Err(err) => {
let err = err.downcast_ref::<String>().map(ToOwned::to_owned).unwrap_or_default();
exit_with_message(1, &format!("Panic while trying to start node process.\nDetails: {err}"))
}
}

async fn restart(&self) {
let mut process = self.process.write().await;
*process = ExecutorProcess::spawn();
}

pub(crate) async fn request<T: DeserializeOwned>(&self, method: &str, params: serde_json::Value) -> Result<T> {
let p = self.process.read().await;
p.request(method, params).await
}
}

struct ExecutorProcessDiedError;

impl fmt::Debug for ExecutorProcessDiedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "The external test executor process died")
}
}

impl Display for ExecutorProcessDiedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}

impl StdError for ExecutorProcessDiedError {}

pub(super) static EXTERNAL_PROCESS: Lazy<RestartableExecutorProcess> = Lazy::new(RestartableExecutorProcess::new);
});

type ReqImpl = (
jsonrpc_core::MethodCall,
Expand Down Expand Up @@ -173,7 +122,8 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {

let mut stdout = BufReader::new(process.stdout.unwrap()).lines();
let mut stdin = process.stdin.unwrap();
let mut last_pending_request: Option<(jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>)> = None;
let mut pending_requests: HashMap<jsonrpc_core::Id, oneshot::Sender<Result<serde_json::value::Value>>> =
HashMap::new();

loop {
tokio::select! {
Expand All @@ -187,11 +137,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
{
match serde_json::from_str::<jsonrpc_core::Output>(&line) {
Ok(response) => {
let (id, sender) = last_pending_request.take().expect("got a response from the external process, but there was no pending request");
if &id != response.id() {
unreachable!("got a response from the external process, but the id didn't match. Are you running with cargo tests with `--test-threads=1`");
}

let sender = pending_requests.remove(response.id()).unwrap();
match response {
jsonrpc_core::Output::Success(success) => {
// The other end may be dropped if the whole
Expand All @@ -213,12 +159,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
}
Ok(None) => // end of the stream
{
tracing::error!("Error when reading from child node process. Process might have exited. Restarting...");
if let Some((_, sender)) = last_pending_request.take() {
sender.send(Err(Box::new(ExecutorProcessDiedError))).unwrap();
}
EXTERNAL_PROCESS.restart().await;
break;
exit_with_message(1, "child node process stdout closed")
}
Err(err) => // log it
{
Expand All @@ -233,7 +174,7 @@ fn start_rpc_thread(mut receiver: mpsc::Receiver<ReqImpl>) -> Result<()> {
exit_with_message(1, "The json-rpc client channel was closed");
}
Some((request, response_sender)) => {
last_pending_request = Some((request.id.clone(), response_sender));
pending_requests.insert(request.id.clone(), response_sender);
let mut req = serde_json::to_vec(&request).unwrap();
req.push(b'\n');
stdin.write_all(&req).await.unwrap();
Expand Down

0 comments on commit 0cd2541

Please sign in to comment.