Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Dispute distribution improvements #3853

Merged
merged 3 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 16 additions & 1 deletion node/network/dispute-distribution/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ struct MetricsInner {
///
/// We both have successful imports and failed imports here.
imported_requests: CounterVec<U64>,

/// The duration of issued dispute request to response.
time_dispute_request: prometheus::Histogram,
}

impl Metrics {
Expand All @@ -61,7 +64,7 @@ impl Metrics {
}
}

/// Increment counter on served chunks.
/// Increment counter on served disputes.
pub fn on_received_request(&self) {
if let Some(metrics) = &self.0 {
metrics.received_requests.inc()
Expand All @@ -74,6 +77,11 @@ impl Metrics {
metrics.imported_requests.with_label_values(&[label]).inc()
}
}

/// Get a timer to time request/response duration.
pub fn time_dispute_request(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.time_dispute_request.start_timer())
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -106,6 +114,13 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
time_dispute_request: prometheus::register(
prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
"parachain_dispute_distribution_time_dispute_request",
"Time needed for dispute votes to get confirmed/fail getting transmitted.",
))?,
registry,
)?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think they are good enough.

};
Ok(Metrics(Some(metrics)))
}
Expand Down
16 changes: 12 additions & 4 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ impl DisputeSender {
return Ok(())
},
Entry::Vacant(vacant) => {
let send_task =
SendTask::new(ctx, runtime, &self.active_sessions, self.tx.clone(), req)
.await?;
let send_task = SendTask::new(
ctx,
runtime,
&self.active_sessions,
self.tx.clone(),
req,
&self.metrics,
)
.await?;
vacant.insert(send_task);
},
}
Expand Down Expand Up @@ -140,7 +146,9 @@ impl DisputeSender {

for dispute in self.disputes.values_mut() {
if have_new_sessions || dispute.has_failed_sends() {
dispute.refresh_sends(ctx, runtime, &self.active_sessions).await?;
dispute
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
.await?;
}
}

Expand Down
95 changes: 63 additions & 32 deletions node/network/dispute-distribution/src/sender/send_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use futures::{channel::mpsc, future::RemoteHandle, Future, FutureExt, SinkExt};

use polkadot_node_network_protocol::{
request_response::{
outgoing::RequestError,
v1::{DisputeRequest, DisputeResponse},
OutgoingRequest, OutgoingResult, Recipient, Requests,
},
IfDisconnected,
};
use polkadot_node_subsystem_util::runtime::RuntimeInfo;
use polkadot_node_subsystem_util::{metrics, runtime::RuntimeInfo};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex, ValidatorIndex,
};
Expand All @@ -38,7 +39,7 @@ use super::error::{Fatal, Result};

use crate::{
metrics::{FAILED, SUCCEEDED},
LOG_TARGET,
Metrics, LOG_TARGET,
};

/// Delivery status for a particular dispute.
Expand All @@ -57,6 +58,16 @@ pub struct SendTask {
/// Whether we have any tasks failed since the last refresh.
has_failed_sends: bool,

/// Total count of failed transmissions.
///
/// Used for issuing a warning, if that number gets above a certain threshold.
failed_count: usize,

/// Total number of initiated requests.
///
/// Used together with `failed_count` for issuing a warning on too many failed attempts.
send_count: usize,

/// Sender to be cloned for tasks.
tx: mpsc::Sender<TaskFinish>,
}
Expand Down Expand Up @@ -87,14 +98,14 @@ pub enum TaskResult {
/// Task was not able to get the request out to its peer.
///
/// It should be retried in that case.
Failed,
Failed(RequestError),
}

impl TaskResult {
pub fn as_metrics_label(&self) -> &'static str {
match self {
Self::Succeeded => SUCCEEDED,
Self::Failed => FAILED,
Self::Failed(_) => FAILED,
}
}
}
Expand All @@ -107,10 +118,17 @@ impl SendTask {
active_sessions: &HashMap<SessionIndex, Hash>,
tx: mpsc::Sender<TaskFinish>,
request: DisputeRequest,
metrics: &Metrics,
) -> Result<Self> {
let mut send_task =
Self { request, deliveries: HashMap::new(), has_failed_sends: false, tx };
send_task.refresh_sends(ctx, runtime, active_sessions).await?;
let mut send_task = Self {
request,
deliveries: HashMap::new(),
has_failed_sends: false,
tx,
failed_count: 0,
send_count: 0,
};
send_task.refresh_sends(ctx, runtime, active_sessions, metrics).await?;
Ok(send_task)
}

Expand All @@ -123,6 +141,7 @@ impl SendTask {
ctx: &mut Context,
runtime: &mut RuntimeInfo,
active_sessions: &HashMap<SessionIndex, Hash>,
metrics: &Metrics,
) -> Result<()> {
let new_authorities = self.get_relevant_validators(ctx, runtime, active_sessions).await?;

Expand All @@ -137,10 +156,12 @@ impl SendTask {

// Start any new tasks that are needed:
let new_statuses =
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone()).await?;
send_requests(ctx, self.tx.clone(), add_authorities, self.request.clone(), metrics)
.await?;

self.deliveries.extend(new_statuses.into_iter());
self.has_failed_sends = false;
self.send_count += new_statuses.len();
self.deliveries.extend(new_statuses.into_iter());
Ok(())
}

Expand All @@ -150,15 +171,38 @@ impl SendTask {
}

/// Handle a finished response waiting task.
///
/// Called by `DisputeSender` upon reception of the corresponding message from our spawned `wait_response_task`.
pub fn on_finished_send(&mut self, authority: &AuthorityDiscoveryId, result: TaskResult) {
match result {
TaskResult::Failed => {
tracing::warn!(
TaskResult::Failed(err) => {
tracing::debug!(
target: LOG_TARGET,
candidate = ?self.request.0.candidate_receipt.hash(),
?authority,
"Could not get our message out! If this keeps happening, then check chain whether the dispute made it there."
candidate_hash = %self.request.0.candidate_receipt.hash(),
%err,
"Error sending dispute statements to node."
);

self.failed_count += 1;
let error_rate = (100 * self.failed_count).checked_div(self.send_count).expect(
"We cannot receive a failed request, without having sent one first. qed.",
);
// 10% seems to be a sensible threshold to become alert - note that
// self.send_count gets increased in batches of the full validator set, so we don't
// need to account for a low send_count.
if error_rate > 10 {
tracing::warn!(
target: LOG_TARGET,
candidate_hash = %self.request.0.candidate_receipt.hash(),
last_authority = ?authority,
last_error = %err,
failed_count = ?self.failed_count,
total_attempts = ?self.send_count,
"Sending our dispute vote failed for more than 10% of total attempts!"
);
}

self.has_failed_sends = true;
// Remove state, so we know what to try again:
self.deliveries.remove(authority);
Expand Down Expand Up @@ -236,6 +280,7 @@ async fn send_requests<Context: SubsystemContext>(
tx: mpsc::Sender<TaskFinish>,
receivers: Vec<AuthorityDiscoveryId>,
req: DisputeRequest,
metrics: &Metrics,
) -> Result<HashMap<AuthorityDiscoveryId, DeliveryStatus>> {
let mut statuses = HashMap::with_capacity(receivers.len());
let mut reqs = Vec::with_capacity(receivers.len());
Expand All @@ -251,6 +296,7 @@ async fn send_requests<Context: SubsystemContext>(
req.0.candidate_receipt.hash(),
receiver.clone(),
tx.clone(),
metrics.time_dispute_request(),
);

let (remote, remote_handle) = fut.remote_handle();
Expand All @@ -273,28 +319,13 @@ async fn wait_response_task(
candidate_hash: CandidateHash,
receiver: AuthorityDiscoveryId,
mut tx: mpsc::Sender<TaskFinish>,
_timer: Option<metrics::prometheus::prometheus::HistogramTimer>,
) {
let result = pending_response.await;
let msg = match result {
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
%candidate_hash,
%receiver,
%err,
"Error sending dispute statements to node."
);
TaskFinish { candidate_hash, receiver, result: TaskResult::Failed }
},
Ok(DisputeResponse::Confirmed) => {
tracing::trace!(
target: LOG_TARGET,
%candidate_hash,
%receiver,
"Sending dispute message succeeded"
);
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded }
},
Err(err) => TaskFinish { candidate_hash, receiver, result: TaskResult::Failed(err) },
Ok(DisputeResponse::Confirmed) =>
TaskFinish { candidate_hash, receiver, result: TaskResult::Succeeded },
};
if let Err(err) = tx.feed(msg).await {
tracing::debug!(
Expand Down