Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into ao-5535-followup
Browse files Browse the repository at this point in the history
* master:
  [ci] fix build-implementers-guide (#6335)
  Rate limit improvements (#6315)
  Add PVF module documentation (#6293)
  Update async-trait version to v0.1.58 (#6319)
  Extend lower bound of `manage_lease_period_start` from `runtime_common::slots` (#6318)
  Add `starts_with` to v0 and v1 MultiLocation (#6311)
  Remove the `wasmtime` feature flag (companion for substrate#12684) (#6268)
  [ci] fix build implementers guide (#6306)
  Change best effort queue behaviour in `dispute-coordinator` (#6275)
  Dedup subsystem name (#6305)
  Add Helikon boot nodes for Polkadot, Kusama and Westend. (#6240)
  Provisioner should ignore unconfirmed disputes (#6294)
  [ci] Improve pipeline stopper (#6300)
  • Loading branch information
ordian committed Nov 24, 2022
2 parents 89b03f6 + e1cd7ce commit c96630c
Show file tree
Hide file tree
Showing 42 changed files with 901 additions and 530 deletions.
18 changes: 17 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,24 @@ deploy-parity-testnet:
PR_NUM: "${PR_NUM}"
trigger:
project: "parity/infrastructure/ci_cd/pipeline-stopper"
branch: "as-improve"

.cancel-pipeline-test-linux-stable:
remove-cancel-pipeline-message:
stage: .post
rules:
- if: $CI_COMMIT_REF_NAME =~ /^[0-9]+$/ # PRs
variables:
PROJECT_ID: "${CI_PROJECT_ID}"
PROJECT_NAME: "${CI_PROJECT_NAME}"
PIPELINE_ID: "${CI_PIPELINE_ID}"
FAILED_JOB_URL: "https://gitlab.com"
FAILED_JOB_NAME: "nope"
PR_NUM: "${CI_COMMIT_REF_NAME}"
trigger:
project: "parity/infrastructure/ci_cd/pipeline-stopper"
branch: "as-improve"

cancel-pipeline-test-linux-stable:
extends: .cancel-pipeline-template
needs:
- job: test-linux-stable
Expand Down
367 changes: 184 additions & 183 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master",
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }

[features]
default = ["wasmtime", "db", "cli", "hostperfcheck", "full-node", "trie-memory-tracker", "polkadot-native"]
wasmtime = ["sc-cli/wasmtime"]
default = ["db", "cli", "hostperfcheck", "full-node", "trie-memory-tracker", "polkadot-native"]
db = ["service/db"]
cli = [
"clap",
Expand Down
4 changes: 3 additions & 1 deletion node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,9 @@ impl Initialized {

let _ = tx.send(
get_active_with_status(recent_disputes.into_iter(), now)
.map(|(k, _)| k)
.map(|((session_idx, candidate_hash), dispute_status)| {
(session_idx, candidate_hash, dispute_status)
})
.collect(),
);
},
Expand Down
185 changes: 88 additions & 97 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use std::{cmp::Ordering, collections::BTreeMap};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000;
#[cfg(test)]
const PRIORITY_QUEUE_SIZE: usize = 2;

/// Type for counting how often a candidate was added to the best effort queue.
type BestEffortCount = u32;

/// Queues for dispute participation.
/// In both queues we have a strict ordering of candidates and participation will
/// happen in that order. Refer to `CandidateComparator` for details on the ordering.
pub struct Queues {
/// Set of best effort participation requests.
///
/// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for
/// the entry with the highest `added_count` to determine what dispute to participate next in.
///
/// This mechanism leads to an amplifying effect - the more validators already participated,
/// the more likely it becomes that more validators will participate soon, which should lead to
/// a quick resolution of disputes, even in the best effort queue.
best_effort: HashMap<CandidateHash, BestEffortEntry>,
best_effort: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Priority queue.
///
/// In the priority queue, we have a strict ordering of candidates and participation will
/// happen in that order.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
}

Expand Down Expand Up @@ -143,14 +129,13 @@ impl ParticipationRequest {
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() }
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be
/// bumped.
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
pub async fn queue(
Expand All @@ -159,94 +144,76 @@ impl Queues {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let comparator = match priority {
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority =>
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
return Some(req.1)
}
self.pop_best_effort()
self.pop_best_effort().map(|d| d.1)
}

fn queue_with_comparator(
&mut self,
comparator: Option<CandidateComparator>,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&req.candidate_hash);
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
// add in in best effort too.
return Ok(())
}
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
// Note: The request might have been added to priority in a previous call already, we
// take care of that case in `dequeue` (more efficient).
self.best_effort
.entry(req.candidate_hash)
.or_insert(BestEffortEntry { req, added_count: 0 })
.added_count += 1;
self.best_effort.insert(comparator, req);
}
Ok(())
}

/// Get the next best from the best effort queue.
///
/// If there are multiple best - just pick one.
fn pop_best_effort(&mut self) -> Option<ParticipationRequest> {
let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| {
if entry1.added_count > entry2.added_count {
(hash1, entry1)
} else {
(hash2, entry2)
}
});
if let Some((best_hash, _)) = best {
let best_hash = best_hash.clone();
self.best_effort.remove(&best_hash).map(|e| e.req)
} else {
None
}
/// Get best from the best effort queue.
fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.best_effort)
}

/// Get best priority queue entry.
fn pop_priority(&mut self) -> Option<ParticipationRequest> {
fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.priority)
}

// `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has
// the extracted implementation
fn pop_impl(
target: &mut BTreeMap<CandidateComparator, ParticipationRequest>,
) -> Option<(CandidateComparator, ParticipationRequest)> {
// Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple:
// priority.pop_first().
if let Some((comparator, _)) = self.priority.iter().next() {
// target.pop_first().
if let Some((comparator, _)) = target.iter().next() {
let comparator = comparator.clone();
self.priority.remove(&comparator)
target
.remove(&comparator)
.map(|participation_request| (comparator, participation_request))
} else {
None
}
}
}

/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}

/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
Expand All @@ -266,9 +233,12 @@ struct BestEffortEntry {
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
struct CandidateComparator {
/// Block number of the relay parent.
/// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when
/// it can't be obtained. For example when the node is lagging behind and new leaves are received
/// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority.
///
/// Important, so we will be participating in oldest disputes first.
/// The order enforced by `CandidateComparator` is important because we want to participate in
/// the oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
Expand All @@ -277,8 +247,10 @@ struct CandidateComparator {
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
relay_parent_block_number: Option<BlockNumber>,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates with the
/// same relay parent block number. Candidates without `relay_parent_block_number` are ordered by
/// the `candidate_hash` (and treated with the lowest priority, as already mentioned).
candidate_hash: CandidateHash,
}

Expand All @@ -287,33 +259,35 @@ impl CandidateComparator {
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
pub fn new_dummy(block_number: Option<BlockNumber>, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}

/// Create a candidate comparator for a given candidate.
///
/// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a
/// `FatalError` in case the chain API call fails with an unexpected error.
/// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the
/// relay parent can be obtained. This is the happy case.
/// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent
/// can't be obtained.
/// - `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new(
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> {
) -> FatalResult<Self> {
let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
let n = get_block_number(sender, candidate.descriptor().relay_parent).await?;

if n.is_none() {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator` \
with an empty relay parent block number will be provided!"
);
}

Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash })
}
}

Expand All @@ -333,11 +307,28 @@ impl PartialOrd for CandidateComparator {

impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
return match (self.relay_parent_block_number, other.relay_parent_block_number) {
(None, None) => {
// No relay parents for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
},
(Some(self_relay_parent_block_num), Some(other_relay_parent_block_num)) => {
match self_relay_parent_block_num.cmp(&other_relay_parent_block_num) {
// if the relay parent is the same for both -> compare hashes
Ordering::Equal => self.candidate_hash.cmp(&other.candidate_hash),
// if not - return the result from comparing the relay parent block numbers
o => return o,
}
},
(Some(_), None) => {
// Candidates with known relay parents are always with priority
Ordering::Less
},
(None, Some(_)) => {
// Ditto
Ordering::Greater
},
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}

Expand Down
Loading

0 comments on commit c96630c

Please sign in to comment.