Skip to content

Commit

Permalink
pd_client: add backoff for the reconnect retries (#15429) (#15471)
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close #15428

pc_client: add store-level backoff for the reconnect retries

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: nolouch <nolouch@gmail.com>
  • Loading branch information
ti-chi-bot and nolouch committed Aug 31, 2023
1 parent 0aed79f commit ea19165
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/pd_client/Cargo.toml
Expand Up @@ -19,6 +19,7 @@ lazy_static = "1.3"
log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug"] }
log_wrappers = { workspace = true }
prometheus = { version = "0.13", features = ["nightly"] }
prometheus-static-metric = "0.5"
security = { workspace = true }
semver = "0.10"
serde = "1.0"
Expand Down
14 changes: 4 additions & 10 deletions components/pd_client/src/client_v2.rs
Expand Up @@ -121,7 +121,7 @@ impl RawClient {

/// Returns Ok(true) when a new connection is established.
async fn maybe_reconnect(&mut self, ctx: &ConnectContext, force: bool) -> Result<bool> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let members = self.members.clone();
Expand All @@ -139,21 +139,15 @@ impl RawClient {
.await
{
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(false);
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
};
Expand Down
29 changes: 23 additions & 6 deletions components/pd_client/src/metrics.rs
Expand Up @@ -2,6 +2,21 @@

use lazy_static::lazy_static;
use prometheus::*;
use prometheus_static_metric::*;

make_static_metric! {
pub label_enum PDReconnectEventKind {
success,
failure,
no_need,
cancel,
try_connect,
}

pub struct PDReconnectEventCounterVec: IntCounter {
"type" => PDReconnectEventKind,
}
}

lazy_static! {
pub static ref PD_REQUEST_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
Expand All @@ -22,12 +37,14 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: PDReconnectEventCounterVec =
register_static_int_counter_vec!(
PDReconnectEventCounterVec,
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_PENDING_HEARTBEAT_GAUGE: IntGauge = register_int_gauge!(
"tikv_pd_pending_heartbeat_total",
"Total number of pending region heartbeat"
Expand Down
100 changes: 75 additions & 25 deletions components/pd_client/src/util.rs
Expand Up @@ -46,6 +46,7 @@ const MAX_RETRY_TIMES: u64 = 5;
// The max duration when retrying to connect to leader. No matter if the
// MAX_RETRY_TIMES is reached.
const MAX_RETRY_DURATION: Duration = Duration::from_secs(10);
const MAX_BACKOFF: Duration = Duration::from_secs(3);

// FIXME: Use a request-independent way to handle reconnection.
const GLOBAL_RECONNECT_INTERVAL: Duration = Duration::from_millis(100); // 0.1s
Expand Down Expand Up @@ -106,6 +107,7 @@ pub struct Inner {
pub tso: TimestampOracle,

last_try_reconnect: Instant,
bo: ExponentialBackoff,
}

impl Inner {
Expand Down Expand Up @@ -197,6 +199,7 @@ impl Client {
pending_heartbeat: Arc::default(),
pending_buckets: Arc::default(),
last_try_reconnect: Instant::now(),
bo: ExponentialBackoff::new(GLOBAL_RECONNECT_INTERVAL),
tso,
}),
feature_gate: FeatureGate::default(),
Expand Down Expand Up @@ -322,18 +325,15 @@ impl Client {
/// Note: Retrying too quickly will return an error due to cancellation.
/// Please always try to reconnect after sending the request first.
pub async fn reconnect(&self, force: bool) -> Result<()> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let future = {
let inner = self.inner.rl();
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
// Avoid unnecessary updating.
// Prevent a large number of reconnections in a short time.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
let connector = PdConnector::new(inner.env.clone(), inner.security_mgr.clone());
Expand All @@ -354,37 +354,38 @@ impl Client {

{
let mut inner = self.inner.wl();
if start.saturating_duration_since(inner.last_try_reconnect) < GLOBAL_RECONNECT_INTERVAL
{
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
// There may be multiple reconnections that pass the read lock at the same time.
// Check again in the write lock to avoid unnecessary updating.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
inner.last_try_reconnect = start;
inner.bo.next_backoff();
}

slow_log!(start.saturating_elapsed(), "try reconnect pd");
let (client, target_info, members, tso) = match future.await {
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
return Ok(());
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
tuple
Ok(res) => {
// Reset the retry count.
{
let mut inner = self.inner.wl();
inner.bo.reset()
}
match res {
None => {
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(());
}
Some(tuple) => {
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
}
}
};

Expand Down Expand Up @@ -861,6 +862,33 @@ impl PdConnector {
}
}

/// Simple backoff strategy.
struct ExponentialBackoff {
base: Duration,
interval: Duration,
}

impl ExponentialBackoff {
pub fn new(base: Duration) -> Self {
Self {
base,
interval: base,
}
}
pub fn next_backoff(&mut self) -> Duration {
self.interval = std::cmp::min(self.interval * 2, MAX_BACKOFF);
self.interval
}

pub fn get_interval(&self) -> Duration {
self.interval
}

pub fn reset(&mut self) {
self.interval = self.base;
}
}

pub fn trim_http_prefix(s: &str) -> &str {
s.trim_start_matches("http://")
.trim_start_matches("https://")
Expand Down Expand Up @@ -1005,8 +1033,11 @@ pub fn merge_bucket_stats<C: AsRef<[u8]>, I: AsRef<[u8]>>(
mod test {
use kvproto::metapb::BucketStats;

use super::*;
use crate::{merge_bucket_stats, util::find_bucket_index};

const BASE_BACKOFF: Duration = Duration::from_millis(100);

#[test]
fn test_merge_bucket_stats() {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -1122,4 +1153,23 @@ mod test {
assert_eq!(find_bucket_index(b"k7", &keys), Some(4));
assert_eq!(find_bucket_index(b"k8", &keys), Some(4));
}

#[test]
fn test_exponential_backoff() {
let mut backoff = ExponentialBackoff::new(BASE_BACKOFF);
assert_eq!(backoff.get_interval(), BASE_BACKOFF);

assert_eq!(backoff.next_backoff(), 2 * BASE_BACKOFF);
assert_eq!(backoff.next_backoff(), Duration::from_millis(400));
assert_eq!(backoff.get_interval(), Duration::from_millis(400));

// Should not exceed MAX_BACKOFF
for _ in 0..20 {
backoff.next_backoff();
}
assert_eq!(backoff.get_interval(), MAX_BACKOFF);

backoff.reset();
assert_eq!(backoff.get_interval(), BASE_BACKOFF);
}
}

0 comments on commit ea19165

Please sign in to comment.