Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Refactor PVF preparation memory stats
Browse files Browse the repository at this point in the history
The original purpose of this change was to gate metrics that are unsupported by
some systems behind conditional compilation directives (#[cfg]); see
#6675 (comment).

Then I started doing some random cleanups and simplifications and got a bit
carried away. 🙈 The code should be overall tidier than before.

Changes:
- Don't register unsupported metrics (e.g. `max_rss` on non-Linux systems)
- Introduce `PrepareStats` struct as an abstraction over the `Ok` values of
  `PrepareResult`. It is cleaner, and can be easily modified in the future.
- Other small changes
  • Loading branch information
mrcnski committed Feb 9, 2023
1 parent 06aec1e commit 995abe1
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 141 deletions.
7 changes: 4 additions & 3 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#![warn(missing_docs)]

use polkadot_node_core_pvf::{
InvalidCandidate as WasmInvalidCandidate, PrepareError, Pvf, ValidationError, ValidationHost,
InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError,
ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -654,7 +655,7 @@ trait ValidationBackend {
validation_result
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError>;
}

#[async_trait]
Expand All @@ -680,7 +681,7 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError> {
async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<PrepareStats, PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
Expand Down
10 changes: 5 additions & 5 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{error::PrepareError, host::PrepareResultSender};
use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
use always_assert::always;
use polkadot_parachain::primitives::ValidationCodeHash;
use std::{
Expand Down Expand Up @@ -101,8 +101,8 @@ pub enum ArtifactState {
/// This is updated when we get the heads up for this artifact or when we just discover
/// this file.
last_time_needed: SystemTime,
/// The CPU time that was taken preparing this artifact.
cpu_time_elapsed: Duration,
/// Stats produced by successful preparation.
prepare_stats: PrepareStats,
},
/// A task to prepare this artifact is scheduled.
Preparing {
Expand Down Expand Up @@ -177,12 +177,12 @@ impl Artifacts {
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
prepare_stats: PrepareStats,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, prepare_stats })
.is_none());
}

Expand Down
7 changes: 4 additions & 3 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prepare::PrepareStats;
use parity_scale_codec::{Decode, Encode};
use std::{any::Any, fmt, time::Duration};
use std::{any::Any, fmt};

/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if
/// successful
pub type PrepareResult = Result<Duration, PrepareError>;
pub type PrepareResult = Result<PrepareStats, PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
30 changes: 17 additions & 13 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,9 +456,9 @@ async fn handle_precheck_pvf(

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { last_time_needed, cpu_time_elapsed } => {
ArtifactState::Prepared { last_time_needed, prepare_stats } => {
*last_time_needed = SystemTime::now();
let _ = result_sender.send(Ok(*cpu_time_elapsed));
let _ = result_sender.send(Ok(prepare_stats.clone()));
},
ArtifactState::Preparing { waiting_for_response, num_failures: _ } =>
waiting_for_response.push(result_sender),
Expand Down Expand Up @@ -725,8 +725,8 @@ async fn handle_prepare_done(
}

*state = match result {
Ok(cpu_time_elapsed) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), cpu_time_elapsed },
Ok(prepare_stats) =>
ArtifactState::Prepared { last_time_needed: SystemTime::now(), prepare_stats },
Err(error) => {
let last_time_failed = SystemTime::now();
let num_failures = *num_failures + 1;
Expand Down Expand Up @@ -834,7 +834,7 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()>
#[cfg(test)]
mod tests {
use super::*;
use crate::{InvalidCandidate, PrepareError};
use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError};
use assert_matches::assert_matches;
use futures::future::BoxFuture;

Expand Down Expand Up @@ -1056,8 +1056,12 @@ mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
builder
.artifacts
.insert_prepared(artifact_id(1), mock_now, PrepareStats::default());
builder
.artifacts
.insert_prepared(artifact_id(2), mock_now, PrepareStats::default());
let mut test = builder.build();
let mut host = test.host_handle();

Expand Down Expand Up @@ -1129,7 +1133,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand All @@ -1145,7 +1149,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1197,7 +1201,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1304,7 +1308,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1454,7 +1458,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down Expand Up @@ -1630,7 +1634,7 @@ mod tests {
test.from_prepare_queue_tx
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
result: Ok(PrepareStats::default()),
})
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions node/core/pvf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub mod testing;
pub use sp_tracing;

pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
pub use pvf::Pvf;

Expand Down
37 changes: 22 additions & 15 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Prometheus metrics related to the validation host.

use crate::prepare::MemoryStats;
use polkadot_node_metrics::metrics::{self, prometheus};

/// Validation host metrics.
Expand Down Expand Up @@ -73,24 +74,24 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
}

/// Observe max_rss for preparation.
pub(crate) fn observe_preparation_max_rss(&self, max_rss: f64) {
/// Observe memory stats for preparation.
#[allow(unused_variables)]
pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_rss.observe(max_rss);
}
}
#[cfg(target_os = "linux")]
if let Some(max_rss) = memory_stats.max_rss {
metrics.preparation_max_rss.observe(max_rss);
}

/// Observe max resident memory for preparation.
pub(crate) fn observe_preparation_max_resident(&self, max_resident_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_resident.observe(max_resident_kb);
}
}
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
if let Some(tracker_stats) = memory_stats.memory_tracker_stats {
// We convert these stats from B to KB to match the unit of `ru_maxrss` from `getrusage`.
let max_resident_kb = (tracker_stats.resident / 1024) as f64;
let max_allocated_kb = (tracker_stats.allocated / 1024) as f64;

/// Observe max allocated memory for preparation.
pub(crate) fn observe_preparation_max_allocated(&self, max_allocated_kb: f64) {
if let Some(metrics) = &self.0 {
metrics.preparation_max_allocated.observe(max_allocated_kb);
metrics.preparation_max_resident.observe(max_resident_kb);
metrics.preparation_max_allocated.observe(max_allocated_kb);
}
}
}
}
Expand All @@ -106,8 +107,11 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::Histogram,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::Histogram,
}

Expand Down Expand Up @@ -226,6 +230,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(target_os = "linux")]
preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -238,6 +243,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_resident: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand All @@ -250,6 +256,7 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
preparation_max_allocated: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
Expand Down
Loading

0 comments on commit 995abe1

Please sign in to comment.