From ea1916577e2a3768c01b1ab0f98ecd949f7cbeb6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 31 Aug 2023 21:18:40 +0800 Subject: [PATCH] pd_client: add backoff for the reconnect retries (#15429) (#15471) ref tikv/pd#6556, close tikv/tikv#15428 pc_client: add store-level backoff for the reconnect retries Signed-off-by: ti-chi-bot Signed-off-by: nolouch Co-authored-by: ShuNing Co-authored-by: nolouch --- Cargo.lock | 1 + components/pd_client/Cargo.toml | 1 + components/pd_client/src/client_v2.rs | 14 ++-- components/pd_client/src/metrics.rs | 29 ++++++-- components/pd_client/src/util.rs | 100 +++++++++++++++++++------- 5 files changed, 104 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c29271fd417..92cf58491ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3812,6 +3812,7 @@ dependencies = [ "log", "log_wrappers", "prometheus", + "prometheus-static-metric", "security", "semver 0.10.0", "serde", diff --git a/components/pd_client/Cargo.toml b/components/pd_client/Cargo.toml index c25e37f23b5..f46d6111c5d 100644 --- a/components/pd_client/Cargo.toml +++ b/components/pd_client/Cargo.toml @@ -19,6 +19,7 @@ lazy_static = "1.3" log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] } log_wrappers = { workspace = true } prometheus = { version = "0.13", features = ["nightly"] } +prometheus-static-metric = "0.5" security = { workspace = true } semver = "0.10" serde = "1.0" diff --git a/components/pd_client/src/client_v2.rs b/components/pd_client/src/client_v2.rs index 3d17a94a494..8fdfc02e915 100644 --- a/components/pd_client/src/client_v2.rs +++ b/components/pd_client/src/client_v2.rs @@ -121,7 +121,7 @@ impl RawClient { /// Returns Ok(true) when a new connection is established. async fn maybe_reconnect(&mut self, ctx: &ConnectContext, force: bool) -> Result { - PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc(); + PD_RECONNECT_COUNTER_VEC.try_connect.inc(); let start = Instant::now(); let members = self.members.clone(); @@ -139,21 +139,15 @@ impl RawClient { .await { Err(e) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["failure"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.failure.inc(); return Err(e); } Ok(None) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["no-need"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.no_need.inc(); return Ok(false); } Ok(Some(tuple)) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["success"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.success.inc(); tuple } }; diff --git a/components/pd_client/src/metrics.rs b/components/pd_client/src/metrics.rs index 57879a57d0e..1752f331b27 100644 --- a/components/pd_client/src/metrics.rs +++ b/components/pd_client/src/metrics.rs @@ -2,6 +2,21 @@ use lazy_static::lazy_static; use prometheus::*; +use prometheus_static_metric::*; + +make_static_metric! { + pub label_enum PDReconnectEventKind { + success, + failure, + no_need, + cancel, + try_connect, + } + + pub struct PDReconnectEventCounterVec: IntCounter { + "type" => PDReconnectEventKind, + } +} lazy_static! { pub static ref PD_REQUEST_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!( @@ -22,12 +37,14 @@ lazy_static! { &["type"] ) .unwrap(); - pub static ref PD_RECONNECT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!( - "tikv_pd_reconnect_total", - "Total number of PD reconnections.", - &["type"] - ) - .unwrap(); + pub static ref PD_RECONNECT_COUNTER_VEC: PDReconnectEventCounterVec = + register_static_int_counter_vec!( + PDReconnectEventCounterVec, + "tikv_pd_reconnect_total", + "Total number of PD reconnections.", + &["type"] + ) + .unwrap(); pub static ref PD_PENDING_HEARTBEAT_GAUGE: IntGauge = register_int_gauge!( "tikv_pd_pending_heartbeat_total", "Total number of pending region heartbeat" diff --git a/components/pd_client/src/util.rs b/components/pd_client/src/util.rs index 833affedbb9..8b66ecdf591 100644 --- a/components/pd_client/src/util.rs +++ b/components/pd_client/src/util.rs @@ -46,6 +46,7 @@ const MAX_RETRY_TIMES: u64 = 5; // The max duration when retrying to connect to leader. No matter if the // MAX_RETRY_TIMES is reached. const MAX_RETRY_DURATION: Duration = Duration::from_secs(10); +const MAX_BACKOFF: Duration = Duration::from_secs(3); // FIXME: Use a request-independent way to handle reconnection. const GLOBAL_RECONNECT_INTERVAL: Duration = Duration::from_millis(100); // 0.1s @@ -106,6 +107,7 @@ pub struct Inner { pub tso: TimestampOracle, last_try_reconnect: Instant, + bo: ExponentialBackoff, } impl Inner { @@ -197,6 +199,7 @@ impl Client { pending_heartbeat: Arc::default(), pending_buckets: Arc::default(), last_try_reconnect: Instant::now(), + bo: ExponentialBackoff::new(GLOBAL_RECONNECT_INTERVAL), tso, }), feature_gate: FeatureGate::default(), @@ -322,18 +325,15 @@ impl Client { /// Note: Retrying too quickly will return an error due to cancellation. /// Please always try to reconnect after sending the request first. pub async fn reconnect(&self, force: bool) -> Result<()> { - PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc(); + PD_RECONNECT_COUNTER_VEC.try_connect.inc(); let start = Instant::now(); let future = { let inner = self.inner.rl(); - if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL - { + if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() { // Avoid unnecessary updating. // Prevent a large number of reconnections in a short time. - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["cancel"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.cancel.inc(); return Err(box_err!("cancel reconnection due to too small interval")); } let connector = PdConnector::new(inner.env.clone(), inner.security_mgr.clone()); @@ -354,37 +354,38 @@ impl Client { { let mut inner = self.inner.wl(); - if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL - { + if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() { // There may be multiple reconnections that pass the read lock at the same time. // Check again in the write lock to avoid unnecessary updating. - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["cancel"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.cancel.inc(); return Err(box_err!("cancel reconnection due to too small interval")); } inner.last_try_reconnect = start; + inner.bo.next_backoff(); } slow_log!(start.saturating_elapsed(), "try reconnect pd"); let (client, target_info, members, tso) = match future.await { Err(e) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["failure"]) - .inc(); + PD_RECONNECT_COUNTER_VEC.failure.inc(); return Err(e); } - Ok(None) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["no-need"]) - .inc(); - return Ok(()); - } - Ok(Some(tuple)) => { - PD_RECONNECT_COUNTER_VEC - .with_label_values(&["success"]) - .inc(); - tuple + Ok(res) => { + // Reset the retry count. + { + let mut inner = self.inner.wl(); + inner.bo.reset() + } + match res { + None => { + PD_RECONNECT_COUNTER_VEC.no_need.inc(); + return Ok(()); + } + Some(tuple) => { + PD_RECONNECT_COUNTER_VEC.success.inc(); + tuple + } + } } }; @@ -861,6 +862,33 @@ impl PdConnector { } } +/// Simple backoff strategy. +struct ExponentialBackoff { + base: Duration, + interval: Duration, +} + +impl ExponentialBackoff { + pub fn new(base: Duration) -> Self { + Self { + base, + interval: base, + } + } + pub fn next_backoff(&mut self) -> Duration { + self.interval = std::cmp::min(self.interval * 2, MAX_BACKOFF); + self.interval + } + + pub fn get_interval(&self) -> Duration { + self.interval + } + + pub fn reset(&mut self) { + self.interval = self.base; + } +} + pub fn trim_http_prefix(s: &str) -> &str { s.trim_start_matches("http://") .trim_start_matches("https://") @@ -1005,8 +1033,11 @@ pub fn merge_bucket_stats, I: AsRef<[u8]>>( mod test { use kvproto::metapb::BucketStats; + use super::*; use crate::{merge_bucket_stats, util::find_bucket_index}; + const BASE_BACKOFF: Duration = Duration::from_millis(100); + #[test] fn test_merge_bucket_stats() { #[allow(clippy::type_complexity)] @@ -1122,4 +1153,23 @@ mod test { assert_eq!(find_bucket_index(b"k7", &keys), Some(4)); assert_eq!(find_bucket_index(b"k8", &keys), Some(4)); } + + #[test] + fn test_exponential_backoff() { + let mut backoff = ExponentialBackoff::new(BASE_BACKOFF); + assert_eq!(backoff.get_interval(), BASE_BACKOFF); + + assert_eq!(backoff.next_backoff(), 2 * BASE_BACKOFF); + assert_eq!(backoff.next_backoff(), Duration::from_millis(400)); + assert_eq!(backoff.get_interval(), Duration::from_millis(400)); + + // Should not exceed MAX_BACKOFF + for _ in 0..20 { + backoff.next_backoff(); + } + assert_eq!(backoff.get_interval(), MAX_BACKOFF); + + backoff.reset(); + assert_eq!(backoff.get_interval(), BASE_BACKOFF); + } }