Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Handling timers for repeat dispute participation requests #6901

Merged
merged 26 commits into from Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
107f971
Added participation and queue sizes metrics
BradleyOlson64 Mar 7, 2023
a8186fc
First draft of all metric code
BradleyOlson64 Mar 7, 2023
b6cb2c7
Tests pass
BradleyOlson64 Mar 7, 2023
faa7eb5
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Mar 7, 2023
bc7049e
Changed Metrics to field on participation + queues
BradleyOlson64 Mar 8, 2023
7b0bfc0
fmt
BradleyOlson64 Mar 8, 2023
56e7723
Improving naming
BradleyOlson64 Mar 9, 2023
4185d4d
Refactor, placing timer in ParticipationRequest
BradleyOlson64 Mar 9, 2023
20ceda9
fmt
BradleyOlson64 Mar 9, 2023
02e5608
Final cleanup
BradleyOlson64 Mar 10, 2023
6444e6d
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Mar 10, 2023
008063f
Revert "Final cleanup"
BradleyOlson64 Mar 10, 2023
207bb2a
Changing metric names
BradleyOlson64 Mar 10, 2023
3fcb639
Implementing Eq only for unit tests
BradleyOlson64 Mar 10, 2023
ef611aa
fmt
BradleyOlson64 Mar 10, 2023
1a4eb45
Removing Clone trait from ParticipationRequest
BradleyOlson64 Mar 14, 2023
eed5535
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Mar 14, 2023
27fd59f
fmt
BradleyOlson64 Mar 14, 2023
d8e838f
Moved clone functionality to tests helper
BradleyOlson64 Mar 15, 2023
9bd4dcf
fmt
BradleyOlson64 Mar 15, 2023
fea1997
Fixing dropped timers on repeat requests
BradleyOlson64 Mar 16, 2023
a51c115
Merge branch 'master' of https://github.com/paritytech/polkadot into …
BradleyOlson64 Mar 16, 2023
0ac7f5b
Keep older best effort timers
BradleyOlson64 Mar 16, 2023
e7f8112
Removing comment redundency and explaining better
BradleyOlson64 Mar 17, 2023
70b21f3
Updating queue() to use single mem read
BradleyOlson64 Mar 20, 2023
81de5d3
fmt
BradleyOlson64 Mar 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 49 additions & 20 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::BTreeMap, collections::btree_map::Entry};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct ParticipationRequest {
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
_request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
request_timer: Option<prometheus::HistogramTimer>, // Sends metric data when request is dropped
}

/// Whether a `ParticipationRequest` should be put on best-effort or the priority queue.
Expand Down Expand Up @@ -119,12 +119,7 @@ impl ParticipationRequest {
session: SessionIndex,
request_timer: Option<prometheus::HistogramTimer>,
) -> Self {
Self {
candidate_hash: candidate_receipt.hash(),
candidate_receipt,
session,
_request_timer: request_timer,
}
Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer }
}

pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt {
Expand All @@ -147,15 +142,11 @@ impl ParticipationRequest {
#[cfg(test)]
impl PartialEq for ParticipationRequest {
fn eq(&self, other: &Self) -> bool {
let ParticipationRequest {
candidate_receipt,
candidate_hash,
session: _session,
_request_timer,
} = self;
let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } =
self;
candidate_receipt == other.candidate_receipt() &&
candidate_hash == other.candidate_hash() &&
self.session == other.session()
*session == other.session()
}
}
#[cfg(test)]
Expand Down Expand Up @@ -227,19 +218,47 @@ impl Queues {
Ok(())
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
///
/// # Request timers
///
/// [`ParticipationRequest`]s contain request timers.
/// Where an old request would be replaced by a new one, we keep the old request.
/// This prevents request timers from resetting on each new request.
fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
mut req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
// Remove any best effort entry, using it to replace our new
// request.
if let Some(older_request) = self.best_effort.remove(&comparator) {
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
}
req = older_request;
}
// Keeping old request if any.
match self.priority.entry(comparator) {
Entry::Occupied(_) => {
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
}
},
Entry::Vacant(vac) => {
vac.insert(req);
}
}
self.metrics.report_priority_queue_size(self.priority.len() as u64);
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
} else {
Expand All @@ -251,7 +270,17 @@ impl Queues {
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
self.best_effort.insert(comparator, req);
// Keeping old request if any.
match self.best_effort.entry(comparator) {
Entry::Occupied(_) => {
if let Some(timer) = req.request_timer {
timer.stop_and_discard();
}
},
Entry::Vacant(vac) => {
vac.insert(req);
}
}
self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64);
}
Ok(())
Expand Down
Expand Up @@ -46,7 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest {
candidate_receipt: request.candidate_receipt.clone(),
candidate_hash: request.candidate_hash.clone(),
session: request.session,
_request_timer: None,
request_timer: None,
}
}

Expand Down