diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 0c8f7867c556..b3c44064a922 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -24,7 +24,8 @@ #![warn(missing_docs)] use polkadot_node_core_pvf::{ - InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost, + InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError, + ValidationHost, }; use polkadot_node_primitives::{ BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT, @@ -654,7 +655,7 @@ trait ValidationBackend { validation_result } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result; + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result; } #[async_trait] @@ -680,7 +681,7 @@ impl ValidationBackend for ValidationHost { .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? } - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); if let Err(err) = self.precheck_pvf(pvf, tx).await { // Return an IO error if there was an error communicating with the host. diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index c47df9589e47..779bf0fcca33 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend { result } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { + async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { unreachable!() } } @@ -894,11 +894,11 @@ fn pov_decompression_failure_is_invalid() { } struct MockPreCheckBackend { - result: Result, + result: Result, } impl MockPreCheckBackend { - fn with_hardcoded_result(result: Result) -> Self { + fn with_hardcoded_result(result: Result) -> Self { Self { result } } } @@ -914,7 +914,7 @@ impl ValidationBackend for MockPreCheckBackend { unreachable!() } - async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { + async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result { self.result.clone() } } @@ -931,7 +931,7 @@ fn precheck_works() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), + MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())), relay_parent, validation_code_hash, ) @@ -977,7 +977,7 @@ fn precheck_invalid_pvf_blob_compression() { let (check_fut, check_result) = precheck_pvf( ctx.sender(), - MockPreCheckBackend::with_hardcoded_result(Ok(Duration::default())), + MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())), relay_parent, validation_code_hash, ) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 297ed0829cca..d2e1e1e90878 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::{error::PrepareError, host::PrepareResultSender}; +use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats}; use always_assert::always; use polkadot_parachain::primitives::ValidationCodeHash; use std::{ @@ -101,8 +101,8 @@ pub enum ArtifactState { /// This is updated when we get the heads up for this artifact or when we just discover /// this file. last_time_needed: SystemTime, - /// The CPU time that was taken preparing this artifact. - cpu_time_elapsed: Duration, + /// Stats produced by successful preparation. + prepare_stats: PrepareStats, }, /// A task to prepare this artifact is scheduled. Preparing { @@ -177,12 +177,12 @@ impl Artifacts { &mut self, artifact_id: ArtifactId, last_time_needed: SystemTime, - cpu_time_elapsed: Duration, + prepare_stats: PrepareStats, ) { // See the precondition. always!(self .artifacts - .insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed }) + .insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats }) .is_none()); } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index a679b2f96062..3f642cd6ed24 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -14,12 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::prepare::PrepareStats; use parity_scale_codec::{Decode, Encode}; -use std::{any::Any, fmt, time::Duration}; +use std::{any::Any, fmt}; -/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if +/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if /// successful -pub type PrepareResult = Result; +pub type PrepareResult = Result; /// An error that occurred during the prepare part of the PVF pipeline. #[derive(Debug, Clone, Encode, Decode)] diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 956c580380a0..0ee0b1442fda 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -456,9 +456,9 @@ async fn handle_precheck_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => { + ArtifactState::Prepared { last_time_needed, prepare_stats } => { *last_time_needed = SystemTime::now(); - let _ = result_sender.send(Ok(*cpu_time_elapsed)); + let _ = result_sender.send(Ok(prepare_stats.clone())); }, ArtifactState::Preparing { waiting_for_response, num_failures: _ } => waiting_for_response.push(result_sender), @@ -725,8 +725,8 @@ async fn handle_prepare_done( } *state = match result { - Ok(cpu_time_elapsed) => - ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed }, + Ok(prepare_stats) => + ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats }, Err(error) => { let last_time_failed = SystemTime::now(); let num_failures = *num_failures + 1; @@ -834,7 +834,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream #[cfg(test)] mod tests { use super::*; - use crate::{InvalidCandidate, PrepareError}; + use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError}; use assert_matches::assert_matches; use futures::future::BoxFuture; @@ -1056,8 +1056,12 @@ mod tests { let mut builder = Builder::default(); builder.cleanup_pulse_interval = Duration::from_millis(100); builder.artifact_ttl = Duration::from_millis(500); - builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default()); - builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default()); + builder + .artifacts + .insert_prepared(artifact_id(1), mock_now, PrepareStats::default()); + builder + .artifacts + .insert_prepared(artifact_id(2), mock_now, PrepareStats::default()); let mut test = builder.build(); let mut host = test.host_handle(); @@ -1129,7 +1133,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); @@ -1145,7 +1149,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(2), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); @@ -1197,7 +1201,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); @@ -1304,7 +1308,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(2), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); @@ -1454,7 +1458,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); @@ -1630,7 +1634,7 @@ mod tests { test.from_prepare_queue_tx .send(prepare::FromQueue { artifact_id: artifact_id(1), - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }) .await .unwrap(); diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 0e858147bd29..04c7d5323b30 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -108,6 +108,7 @@ pub mod testing; pub use sp_tracing; pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError}; +pub use prepare::PrepareStats; pub use priority::Priority; pub use pvf::Pvf; diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 07a2bf46f530..5995261d5c22 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -16,6 +16,7 @@ //! Prometheus metrics related to the validation host. +use crate::prepare::MemoryStats; use polkadot_node_metrics::metrics::{self, prometheus}; /// Validation host metrics. @@ -73,24 +74,24 @@ impl Metrics { self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) } - /// Observe max_rss for preparation. - pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) { + /// Observe memory stats for preparation. + #[allow(unused_variables)] + pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) { if let Some(metrics) = &self.0 { - metrics.preparation_max_rss.observe(max_rss); - } - } + #[cfg(target_os = "linux")] + if let Some(max_rss) = memory_stats.max_rss { + metrics.preparation_max_rss.observe(max_rss as f64); + } - /// Observe max resident memory for preparation. - pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) { - if let Some(metrics) = &self.0 { - metrics.preparation_max_resident.observe(max_resident_kb); - } - } + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + if let Some(tracker_stats) = memory_stats.memory_tracker_stats { + // We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`. + let max_resident_kb = (tracker_stats.resident / 1024) as f64; + let max_allocated_kb = (tracker_stats.allocated / 1024) as f64; - /// Observe max allocated memory for preparation. - pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) { - if let Some(metrics) = &self.0 { - metrics.preparation_max_allocated.observe(max_allocated_kb); + metrics.preparation_max_resident.observe(max_resident_kb); + metrics.preparation_max_allocated.observe(max_allocated_kb); + } } } } @@ -106,8 +107,11 @@ struct MetricsInner { execute_finished: prometheus::Counter, preparation_time: prometheus::Histogram, execution_time: prometheus::Histogram, + #[cfg(target_os = "linux")] preparation_max_rss: prometheus::Histogram, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_allocated: prometheus::Histogram, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_resident: prometheus::Histogram, } @@ -226,6 +230,7 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + #[cfg(target_os = "linux")] preparation_max_rss: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( @@ -238,6 +243,7 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_resident: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( @@ -250,6 +256,7 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] preparation_max_allocated: prometheus::register( prometheus::Histogram::with_opts( prometheus::HistogramOpts::new( diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 013c0017e623..0e27fac20875 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -27,18 +27,18 @@ //! for more //! background. -use crate::{metrics::Metrics, LOG_TARGET}; use parity_scale_codec::{Decode, Encode}; -use std::io; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. -#[derive(Encode, Decode)] +#[derive(Clone, Debug, Default, Encode, Decode)] pub struct MemoryStats { /// Memory stats from `tikv_jemalloc_ctl`. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub memory_tracker_stats: Option, /// `ru_maxrss` from `getrusage`. A string error since `io::Error` is not `Encode`able. - pub max_rss: Option>, + #[cfg(target_os = "linux")] + pub max_rss: Option, } /// Statistics of collected memory metrics. @@ -51,44 +51,14 @@ pub struct MemoryAllocationStats { pub allocated: u64, } -/// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just -/// returns `None`. -pub fn get_max_rss_thread() -> Option> { - // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. - #[cfg(target_os = "linux")] - let max_rss = Some(getrusage::getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); - #[cfg(not(target_os = "linux"))] - let max_rss = None; - max_rss -} - -/// Helper function to send the memory metrics, if available, to prometheus. -pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { - if let Some(max_rss) = memory_stats.max_rss { - match max_rss { - Ok(max_rss) => metrics.observe_preparation_max_rss(max_rss as f64), - Err(err) => gum::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "error getting `ru_maxrss` in preparation thread: {}", - err - ), - } - } - - if let Some(tracker_stats) = memory_stats.memory_tracker_stats { - // We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`. - let resident_kb = (tracker_stats.resident / 1024) as f64; - let allocated_kb = (tracker_stats.allocated / 1024) as f64; - - metrics.observe_preparation_max_resident(resident_kb); - metrics.observe_preparation_max_allocated(allocated_kb); - } -} - +/// Module for the memory tracker. The memory tracker runs in its own thread, where it polls memory +/// usage at an interval. +/// +/// NOTE: Requires jemalloc enabled. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub mod memory_tracker { use super::*; + use crate::LOG_TARGET; use std::{ sync::mpsc::{Receiver, RecvTimeoutError, Sender}, time::Duration, @@ -183,13 +153,15 @@ pub mod memory_tracker { pub async fn get_memory_tracker_loop_stats( fut: JoinHandle>, tx: Sender<()>, + worker_pid: u32, ) -> Option { // Signal to the memory tracker thread to terminate. if let Err(err) = tx.send(()) { gum::warn!( target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error sending signal to memory tracker_thread: {}", err + %worker_pid, + "worker: error sending signal to memory tracker_thread: {}", + err ); None } else { @@ -199,7 +171,7 @@ pub mod memory_tracker { Ok(Err(err)) => { gum::warn!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "worker: error occurred in the memory tracker thread: {}", err ); None @@ -207,7 +179,7 @@ pub mod memory_tracker { Err(err) => { gum::warn!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "worker: error joining on memory tracker thread: {}", err ); None @@ -217,13 +189,19 @@ pub mod memory_tracker { } } +/// Module for dealing with the `ru_maxrss` (peak resident memory) stat from `getrusage`. +/// +/// NOTE: `getrusage` with the `RUSAGE_THREAD` parameter is only supported on Linux. `RUSAGE_SELF` +/// works on MacOS, but we need to get the max rss only for the preparation thread. Gettng it for +/// the current process would conflate the stats of previous jobs run by the process. #[cfg(target_os = "linux")] -mod getrusage { +pub mod max_rss_stat { + use crate::LOG_TARGET; use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; use std::io; /// Get the rusage stats for the current thread. - pub fn getrusage_thread() -> io::Result { + fn getrusage_thread() -> io::Result { let mut result = rusage { ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, @@ -247,4 +225,25 @@ mod getrusage { } Ok(result) } + + /// Gets the `ru_maxrss` for the current thread. + pub fn get_max_rss_thread() -> io::Result { + // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. + getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss)) + } + + /// Extracts the max_rss stat and logs any error. + pub fn extract_max_rss_stat(max_rss: io::Result, worker_pid: u32) -> Option { + max_rss + .map_err(|err| { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "error getting `ru_maxrss` in preparation thread: {}", + err + ); + err + }) + .ok() + } } diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index 4cbd63eff7d2..bac212e514b0 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -27,6 +27,16 @@ mod pool; mod queue; mod worker; +pub use memory_stats::MemoryStats; pub use pool::start as start_pool; pub use queue::{start as start_queue, FromQueue, ToQueue}; pub use worker::worker_entrypoint; + +use parity_scale_codec::{Decode, Encode}; + +/// Preparation statistics, including the CPU time and memory taken. +#[derive(Debug, Clone, Default, Encode, Decode)] +pub struct PrepareStats { + cpu_time_elapsed: std::time::Duration, + memory_stats: MemoryStats, +} diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 32e9bfa70748..d8dd90688c4f 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -495,6 +495,7 @@ mod tests { use crate::{ error::PrepareError, host::{LENIENT_PREPARATION_TIMEOUT, PRECHECK_PREPARATION_TIMEOUT}, + prepare::PrepareStats, }; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; @@ -622,7 +623,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }); assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); @@ -660,7 +661,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); @@ -710,7 +711,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } @@ -746,7 +747,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Ok(Duration::default()), + result: Ok(PrepareStats::default()), }); // Since there is still work, the queue requested one extra worker to spawn to handle the diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 69c1d3bb5557..3e64777a9c17 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -14,13 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +#[cfg(target_os = "linux")] +use super::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; -use super::memory_stats::{get_max_rss_thread, observe_memory_metrics, MemoryStats}; +use super::memory_stats::MemoryStats; use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, metrics::Metrics, + prepare::PrepareStats, worker_common::{ bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -118,12 +121,11 @@ pub async fn start_work( match result { // Received bytes from worker within the time limit. - Ok(Ok((prepare_result, memory_stats))) => + Ok(Ok(prepare_result)) => handle_response( metrics, IdleWorker { stream, pid }, prepare_result, - memory_stats, pid, tmp_file, artifact_path, @@ -162,13 +164,12 @@ async fn handle_response( metrics: &Metrics, worker: IdleWorker, result: PrepareResult, - memory_stats: Option, - pid: u32, + worker_pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, ) -> Outcome { - let cpu_time_elapsed = match result { + let PrepareStats { cpu_time_elapsed, memory_stats } = match result.clone() { Ok(result) => result, // Timed out on the child. This should already be logged by the child. Err(PrepareError::TimedOut) => return Outcome::TimedOut, @@ -179,7 +180,7 @@ async fn handle_response( // The job didn't complete within the timeout. gum::warn!( target: LOG_TARGET, - worker_pid = %pid, + %worker_pid, "prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}", cpu_time_elapsed.as_millis(), preparation_timeout.as_millis(), @@ -190,7 +191,7 @@ async fn handle_response( gum::debug!( target: LOG_TARGET, - worker_pid = %pid, + %worker_pid, "promoting WIP artifact {} to {}", tmp_file.display(), artifact_path.display(), @@ -201,7 +202,7 @@ async fn handle_response( Err(err) => { gum::warn!( target: LOG_TARGET, - worker_pid = %pid, + %worker_pid, "failed to rename the artifact from {} to {}: {:?}", tmp_file.display(), artifact_path.display(), @@ -213,9 +214,7 @@ async fn handle_response( // If there were no errors up until now, log the memory stats for a successful preparation, if // available. - if let Some(memory_stats) = memory_stats { - observe_memory_metrics(metrics, memory_stats, pid); - } + metrics.observe_preparation_memory_metrics(memory_stats); outcome } @@ -299,19 +298,11 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, Ok((code, tmp_file, preparation_timeout)) } -async fn send_response( - stream: &mut UnixStream, - result: PrepareResult, - memory_stats: Option, -) -> io::Result<()> { - framed_send(stream, &result.encode()).await?; - framed_send(stream, &memory_stats.encode()).await +async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> { + framed_send(stream, &result.encode()).await } -async fn recv_response( - stream: &mut UnixStream, - pid: u32, -) -> io::Result<(PrepareResult, Option)> { +async fn recv_response(stream: &mut UnixStream, pid: u32) -> io::Result { let result = framed_recv(stream).await?; let result = PrepareResult::decode(&mut &result[..]).map_err(|e| { // We received invalid bytes from the worker. @@ -327,14 +318,7 @@ async fn recv_response( format!("prepare pvf recv_response: failed to decode result: {:?}", e), ) })?; - let memory_stats = framed_recv(stream).await?; - let memory_stats = Option::::decode(&mut &memory_stats[..]).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("prepare pvf recv_response: failed to decode memory stats: {:?}", e), - ) - })?; - Ok((result, memory_stats)) + Ok(result) } /// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies @@ -362,10 +346,11 @@ async fn recv_response( pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move { loop { + let worker_pid = std::process::id(); let (code, dest, preparation_timeout) = recv_request(&mut stream).await?; gum::debug!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "worker: preparing artifact", ); @@ -387,28 +372,29 @@ pub fn worker_entrypoint(socket_path: &str) { // Spawn another thread for preparation. let prepare_fut = rt_handle .spawn_blocking(move || { - let prepare_result = prepare_artifact(&code); + let result = prepare_artifact(&code); // Get the `ru_maxrss` stat. If supported, call getrusage for the thread. - let max_rss = get_max_rss_thread(); + #[cfg(target_os = "linux")] + let result = result.map(|artifact| (artifact, get_max_rss_thread())); - (prepare_result, max_rss) + result }) .fuse(); pin_mut!(cpu_time_monitor_fut); pin_mut!(prepare_fut); - let (result, memory_stats) = select_biased! { + let result = select_biased! { // If this future is not selected, the join handle is dropped and the thread will // finish in the background. join_res = cpu_time_monitor_fut => { - let result = match join_res { + match join_res { Ok(Some(cpu_time_elapsed)) => { // Log if we exceed the timeout and the other thread hasn't finished. gum::warn!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", cpu_time_elapsed.as_millis(), preparation_timeout.as_millis(), @@ -417,28 +403,29 @@ pub fn worker_entrypoint(socket_path: &str) { }, Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())), Err(err) => Err(PrepareError::IoErr(err.to_string())), - }; - (result, None) + } }, - compilation_res = prepare_fut => { + prepare_res = prepare_fut => { let cpu_time_elapsed = cpu_time_start.elapsed(); let _ = cpu_time_monitor_tx.send(()); - match compilation_res.unwrap_or_else(|err| (Err(PrepareError::IoErr(err.to_string())), None)) { - (Err(err), _) => { + match prepare_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) { + Err(err) => { // Serialized error will be written into the socket. - (Err(err), None) + Err(err) }, - (Ok(compiled_artifact), max_rss) => { + Ok(ok) => { // Stop the memory stats worker and get its observed memory stats. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] let memory_tracker_stats = - get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await; - #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))] - let memory_tracker_stats = None; + get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx, worker_pid).await; + #[cfg(target_os = "linux")] + let (ok, max_rss) = ok; let memory_stats = MemoryStats { + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_tracker_stats, - max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), + #[cfg(target_os = "linux")] + max_rss: extract_max_rss_stat(max_rss, worker_pid), }; // Write the serialized artifact into a temp file. @@ -450,19 +437,19 @@ pub fn worker_entrypoint(socket_path: &str) { gum::debug!( target: LOG_TARGET, - worker_pid = %std::process::id(), + %worker_pid, "worker: writing artifact to {}", dest.display(), ); - tokio::fs::write(&dest, &compiled_artifact).await?; + tokio::fs::write(&dest, &ok).await?; - (Ok(cpu_time_elapsed), Some(memory_stats)) + Ok(PrepareStats{cpu_time_elapsed, memory_stats}) }, } }, }; - send_response(&mut stream, result, memory_stats).await?; + send_response(&mut stream, result).await?; } }); }