Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Adding Dispute Participation Metrics #6838

Merged
merged 15 commits into from
Mar 11, 2023
Merged
3 changes: 2 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,12 +916,13 @@ impl Initialized {
} else {
self.metrics.on_queued_best_effort_participation();
}
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
let r = self
.participation
.queue_participation(
ctx,
priority,
ParticipationRequest::new(new_state.candidate_receipt().clone(), session),
ParticipationRequest::new(new_state.candidate_receipt().clone(), session, request_timer),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first reaction was maybe request_timer can be initialized in ParticipationRequest::new. But after looking at it, doing it like this instead of hiding it inside ParticipationRequest::new makes it more explicit that we are starting a timer here. 👍

)
.await;
log_error(r)?;
Expand Down
2 changes: 2 additions & 0 deletions node/core/dispute-coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem {
?candidate_hash,
"Found valid dispute, with no vote from us on startup - participating."
);
let request_timer = Arc::new(self.metrics.time_participation_pipeline());
gilescope marked this conversation as resolved.
Show resolved Hide resolved
participation_requests.push((
ParticipationPriority::with_priority_if(is_included),
ParticipationRequest::new(
vote_state.votes().candidate_receipt.clone(),
session,
request_timer,
),
));
}
Expand Down
15 changes: 5 additions & 10 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,19 @@ impl Participation {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let request_timer = self.metrics.time_participation_pipeline();

// Participation already running - we can ignore that request:
if self.running_participations.contains(req.candidate_hash()) {
return Ok(())
}
// Available capacity - participate right away (if we already have a recent block):
if let Some((_, h)) = self.recent_block {
if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
self.fork_participation(ctx, req, h, request_timer)?;
self.fork_participation(ctx, req, h)?;
return Ok(())
}
}
// Out of capacity/no recent block yet - queue:
self.queue.queue(ctx.sender(), priority, req, request_timer).await
self.queue.queue(ctx.sender(), priority, req).await
}

/// Message from a worker task was received - get the outcome.
Expand Down Expand Up @@ -243,9 +241,9 @@ impl Participation {
recent_head: Hash,
) -> FatalResult<()> {
while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS {
let (maybe_req, maybe_timer) = self.queue.dequeue();
let maybe_req = self.queue.dequeue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why we need this variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! That was an artifact from a discarded implementation.

if let Some(req) = maybe_req {
self.fork_participation(ctx, req, recent_head, maybe_timer)?;
self.fork_participation(ctx, req, recent_head)?;
} else {
break
}
Expand All @@ -259,7 +257,6 @@ impl Participation {
ctx: &mut Context,
req: ParticipationRequest,
recent_head: Hash,
request_timer: Option<prometheus::HistogramTimer>,
) -> FatalResult<()> {
let participation_timer = self.metrics.time_participation();
if self.running_participations.insert(*req.candidate_hash()) {
Expand All @@ -271,7 +268,6 @@ impl Participation {
sender,
recent_head,
req,
request_timer,
participation_timer,
)
.boxed(),
Expand All @@ -286,8 +282,7 @@ async fn participate(
mut result_sender: WorkerMessageSender,
mut sender: impl overseer::DisputeCoordinatorSenderTrait,
block_hash: Hash,
req: ParticipationRequest,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
req: ParticipationRequest, // Sends metric data via request_timer field when dropped
_participation_timer: Option<prometheus::HistogramTimer>, // Sends metric data when dropped
) {
#[cfg(test)]
Expand Down
58 changes: 27 additions & 31 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -60,21 +60,17 @@ pub struct Queues {
/// Priority queue.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Timer handle for each participation request. Stored to measure full request
/// completion time. Optimally these would have been stored in the participation
/// request itself, but HistogramTimer doesn't implement the Clone trait.
request_timers: BTreeMap<CandidateComparator, Option<prometheus::HistogramTimer>>,

/// Handle for recording queues data in metrics
metrics: Metrics,
}

/// A dispute participation request that can be queued.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Clone)]
tdimitrov marked this conversation as resolved.
Show resolved Hide resolved
pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Arc<Option<prometheus::HistogramTimer>> // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -118,8 +114,8 @@ pub enum QueueError {

impl ParticipationRequest {
/// Create a new `ParticipationRequest` to be queued.
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session }
pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex, request_timer: Arc<Option<prometheus::HistogramTimer>>) -> Self {
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, _request_timer: request_timer }
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -135,6 +131,20 @@ impl ParticipationRequest {
let Self { candidate_hash, candidate_receipt, .. } = self;
(candidate_hash, candidate_receipt)
}
// For tests we want to check whether requests are equal, but the
// request_timer field of ParticipationRequest doesn't implement
// eq. This helper checks whether all other fields are equal,
// which is sufficient.
#[cfg(test)]
pub fn functionally_equal(&self, other: ParticipationRequest) -> bool {
Copy link
Contributor

@mrcnski mrcnski Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I had an internal struggle about this, but I think we should just impl eq on this type. I believe that already implies functional equality. I read the docs for PartialEq, and there is no restriction mentioned about having to include every field in the equality comparison.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! I hadn't thought of it that way. But it does make sense. I shall make changes.

if &self.candidate_receipt == other.candidate_receipt() &&
&self.candidate_hash == other.candidate_hash() &&
self.session == other.session()
Copy link
Contributor

@mrcnski mrcnski Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would destructure self into its fields, so that if a field is ever added in the future we don't forget to add it to the comparison here. It would look something like this:

let ParticipationRequest { candidate_receipt, candidate_hash, session, _request_timer } = self;
/* do the comparison */

Credit to @eskimor for teaching me this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also, instead of if cond { return true; } false, you can simplify it to just cond. :) I think clippy should warn about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Credit to @eskimor for teaching me this!

+1 :)

{
return true;
}
false
}
}

impl Queues {
Expand All @@ -143,7 +153,6 @@ impl Queues {
Self {
best_effort: BTreeMap::new(),
priority: BTreeMap::new(),
request_timers: BTreeMap::new(),
metrics,
}
}
Expand All @@ -159,35 +168,27 @@ impl Queues {
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req, timer)?;
self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
/// We also get the corresponding request timer, if any.
pub fn dequeue(
&mut self,
) -> (Option<ParticipationRequest>, Option<prometheus::HistogramTimer>) {
if let Some((comp, req)) = self.pop_priority() {
) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
self.metrics.report_priority_queue_size(self.priority.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
return Some(req.1)
}
if let Some((comp, req)) = self.pop_best_effort() {
if let Some(req) = self.pop_best_effort() {
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
if let Some(maybe_timer) = self.request_timers.remove(&comp) {
return (Some(req), maybe_timer)
}
return (Some(req), None)
return Some(req.1)
}
(None, None)
None
}

/// Reprioritizes any participation requests pertaining to the
Expand Down Expand Up @@ -223,17 +224,13 @@ impl Queues {
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
timer: Option<prometheus::HistogramTimer>,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
if let None = self.best_effort.remove(&comparator) {
// Only insert new timer if request wasn't in either queue
self.request_timers.insert(comparator, timer);
}
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
Expand All @@ -247,7 +244,6 @@ impl Queues {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
self.request_timers.insert(comparator, timer);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
Expand Down
Loading