Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adding Dispute Participation Metrics #6838

Merged
merged 15 commits into from
Mar 11, 2023
Merged
5 changes: 3 additions & 2 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Initialized {
let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem;

let (participation_sender, participation_receiver) = mpsc::channel(1);
let participation = Participation::new(participation_sender);
let participation = Participation::new(participation_sender, metrics.clone());
let highest_session = rolling_session_window.latest_session();

Self {
Expand Down Expand Up @@ -916,12 +916,13 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(new_state.candidate_receipt().clone(), session, request_timer),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first reaction was maybe request_timer can be initialized in ParticipationRequest::new. But after looking at it, doing it like this instead of hiding it inside ParticipationRequest::new makes it more explicit that we are starting a timer here. 👍

)
.await;
log_error(r)?;
Expand Down
2 changes: 2 additions & 0 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
gilescope marked this conversation as resolved.
Show resolved Hide resolved
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
Expand Down
68 changes: 68 additions & 0 deletions node/core/dispute-coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ struct MetricsInner {
vote_cleanup_time: prometheus::Histogram,
/// Number of refrained participations.
refrained_participations: prometheus::Counter<prometheus::U64>,
/// Distribution of participation durations.
participation_durations: prometheus::Histogram,
/// Measures the duration of the full participation pipeline: From when
/// a participation request is first queued to when participation in the
/// requested dispute is complete.
participation_pipeline_durations: prometheus::Histogram,
/// Size of participation priority queue
participation_priority_queue_size: prometheus::Gauge<prometheus::U64>,
/// Size of participation best effort queue
participation_best_effort_queue_size: prometheus::Gauge<prometheus::U64>,
}

/// Candidate validation metrics.
Expand Down Expand Up @@ -96,6 +106,36 @@ impl Metrics {
metrics.refrained_participations.inc();
}
}

/// Provide a timer for participation durations which updates on drop.
pub(crate) fn time_participation(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer())
}

/// Provide a timer for participation pipeline durations which updates on drop.
pub(crate) fn time_participation_pipeline(
&self,
) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0
.as_ref()
.map(|metrics| metrics.participation_pipeline_durations.start_timer())
}

/// Set the priority_queue_size metric
pub fn report_priority_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_priority_queue_size.set(size);
}
}

/// Set the best_effort_queue_size metric
pub fn report_best_effort_queue_size(&self, size: u64) {
if let Some(metrics) = &self.0 {
metrics.participation_best_effort_queue_size.set(size);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -163,6 +203,34 @@ impl metrics::Metrics for Metrics {
))?,
registry,
)?,
participation_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_durations",
"Time spent within fn Participation::participate",
sandreim marked this conversation as resolved.
Show resolved Hide resolved
)
)?,
registry,
)?,
participation_pipeline_durations: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_parachain_dispute_participation_pipeline_durations",
"Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.",
)
)?,
registry,
)?,
participation_priority_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should the metric names be updated to match the new variable names? e.g. "polkadot_parachain_dispute_participation_priority_queue_size"

"Number of disputes waiting for local participation in the priority queue.")?,
registry,
)?,
participation_best_effort_queue_size: prometheus::register(
prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size",
"Number of disputes waiting for local participation in the best effort queue.")?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
Expand Down
26 changes: 21 additions & 5 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ mod queues;
use queues::Queues;
pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

/// How many participation processes do we want to run in parallel the most.
///
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
Expand All @@ -69,6 +72,8 @@ pub struct Participation {
worker_sender: WorkerMessageSender,
/// Some recent block for retrieving validation code from chain.
recent_block: Option<(BlockNumber, Hash)>,
/// Metrics handle cloned from Initialized
metrics: Metrics,
}

/// Message from worker tasks.
Expand Down Expand Up @@ -133,12 +138,13 @@ impl Participation {
/// The passed in sender will be used by background workers to communicate back their results.
/// The calling context should make sure to call `Participation::on_worker_message()` for the
/// received messages.
pub fn new(sender: WorkerMessageSender) -> Self {
pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self {
Self {
running_participations: HashSet::new(),
queue: Queues::new(),
queue: Queues::new(metrics.clone()),
eskimor marked this conversation as resolved.
Show resolved Hide resolved
worker_sender: sender,
recent_block: None,
metrics,
}
}

Expand Down Expand Up @@ -235,7 +241,8 @@ impl Participation {
recent_head: Hash,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
if let Some(req) = self.queue.dequeue() {
let maybe_req = self.queue.dequeue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why we need this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That was an artifact from a discarded implementation.

if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head)?;
} else {
break
Expand All @@ -251,11 +258,19 @@ impl Participation {
req: ParticipationRequest,
recent_head: Hash,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
let sender = ctx.sender().clone();
ctx.spawn(
"participation-worker",
participate(self.worker_sender.clone(), sender, recent_head, req).boxed(),
participate(
self.worker_sender.clone(),
sender,
recent_head,
req,
participation_timer,
)
.boxed(),
)
.map_err(FatalError::SpawnFailed)?;
}
Expand All @@ -267,7 +282,8 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
// Hack for tests, so we get recovery messages not too early.
Expand Down
54 changes: 46 additions & 8 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand All @@ -25,6 +25,9 @@ use crate::{
LOG_TARGET,
};

use crate::metrics::Metrics;
use polkadot_node_subsystem_util::metrics::prometheus::prometheus;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -56,14 +59,18 @@ pub struct Queues {

/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>> // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -107,8 +114,8 @@ pub enum QueueError {

impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex, request_timer: Arc<Option<prometheus::HistogramTimer>>) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, _request_timer: request_timer }
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -124,12 +131,30 @@ impl ParticipationRequest {
let Self { candidate_hash, candidate_receipt, .. } = self;
(candidate_hash, candidate_receipt)
}
// For tests we want to check whether requests are equal, but the
// request_timer field of ParticipationRequest doesn't implement
// eq. This helper checks whether all other fields are equal,
// which is sufficient.
#[cfg(test)]
pub fn functionally_equal(&self, other: ParticipationRequest) -> bool {
Copy link
Contributor

@mrcnski mrcnski Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I had an internal struggle about this, but I think we should just impl eq on this type. I believe that already implies functional equality. I read the docs for PartialEq, and there is no restriction mentioned about having to include every field in the equality comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I hadn't thought of it that way. But it does make sense. I shall make changes.

if &self.candidate_receipt == other.candidate_receipt() &&
&self.candidate_hash == other.candidate_hash() &&
self.session == other.session()
Copy link
Contributor

@mrcnski mrcnski Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would destructure self into its fields, so that if a field is ever added in the future we don't forget to add it to the comparison here. It would look something like this:

let ParticipationRequest { candidate_receipt, candidate_hash, session, _request_timer } = self;
/* do the comparison */

Credit to @eskimor for teaching me this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also, instead of if cond { return true; } false, you can simplify it to just cond. :) I think clippy should warn about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credit to @eskimor for teaching me this!

+1 :)

{
return true;
}
false
}
}

impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
pub fn new(metrics: Metrics) -> Self {
Self {
best_effort: BTreeMap::new(),
priority: BTreeMap::new(),
metrics,
}
}

/// Will put message in queue, either priority or best effort depending on priority.
Expand All @@ -152,11 +177,18 @@ impl Queues {

/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
pub fn dequeue(
&mut self,
) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
return Some(req.1)
}
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
return Some(req.1)
}
self.pop_best_effort().map(|d| d.1)
None
}

/// Reprioritizes any participation requests pertaining to the
Expand All @@ -180,6 +212,9 @@ impl Queues {
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
// Report changes to both queue sizes
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand All @@ -197,6 +232,8 @@ impl Queues {
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
Expand All @@ -207,6 +244,7 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
}
Expand Down
Loading