Skip to content

Commit

Permalink
pd_client: add backoff for the reconnect retries (#15429)
Browse files Browse the repository at this point in the history
ref tikv/pd#6556, close #15428

pc_client: add store-level backoff for the reconnect retries

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Aug 30, 2023
1 parent 517522b commit 4b3e33e
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 42 deletions.
14 changes: 4 additions & 10 deletions components/pd_client/src/client_v2.rs
Expand Up @@ -117,7 +117,7 @@ impl RawClient {

/// Returns Ok(true) when a new connection is established.
async fn maybe_reconnect(&mut self, ctx: &ConnectContext, force: bool) -> Result<bool> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let members = self.members.clone();
Expand All @@ -135,21 +135,15 @@ impl RawClient {
.await
{
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(false);
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
};
Expand Down
27 changes: 20 additions & 7 deletions components/pd_client/src/metrics.rs
Expand Up @@ -2,7 +2,7 @@

use lazy_static::lazy_static;
use prometheus::*;
use prometheus_static_metric::{make_static_metric, register_static_histogram_vec};
use prometheus_static_metric::*;

make_static_metric! {
pub label_enum PDRequestEventType {
Expand Down Expand Up @@ -40,9 +40,20 @@ make_static_metric! {
meta_storage_watch,
}

pub label_enum PDReconnectEventKind {
success,
failure,
no_need,
cancel,
try_connect,
}

pub struct PDRequestEventHistogramVec: Histogram {
"type" => PDRequestEventType,
}
pub struct PDReconnectEventCounterVec: IntCounter {
"type" => PDReconnectEventKind,
}
}

lazy_static! {
Expand All @@ -66,12 +77,14 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_RECONNECT_COUNTER_VEC: PDReconnectEventCounterVec =
register_static_int_counter_vec!(
PDReconnectEventCounterVec,
"tikv_pd_reconnect_total",
"Total number of PD reconnections.",
&["type"]
)
.unwrap();
pub static ref PD_PENDING_HEARTBEAT_GAUGE: IntGauge = register_int_gauge!(
"tikv_pd_pending_heartbeat_total",
"Total number of pending region heartbeat"
Expand Down
100 changes: 75 additions & 25 deletions components/pd_client/src/util.rs
Expand Up @@ -50,6 +50,7 @@ const MAX_RETRY_TIMES: u64 = 5;
// The max duration when retrying to connect to leader. No matter if the
// MAX_RETRY_TIMES is reached.
const MAX_RETRY_DURATION: Duration = Duration::from_secs(10);
const MAX_BACKOFF: Duration = Duration::from_secs(3);

// FIXME: Use a request-independent way to handle reconnection.
pub const REQUEST_RECONNECT_INTERVAL: Duration = Duration::from_secs(1); // 1s
Expand Down Expand Up @@ -116,6 +117,7 @@ pub struct Inner {
pub rg_resp: Option<ClientDuplexReceiver<TokenBucketsResponse>>,

last_try_reconnect: Instant,
bo: ExponentialBackoff,
}

impl Inner {
Expand Down Expand Up @@ -168,7 +170,6 @@ pub struct Client {
pub(crate) inner: RwLock<Inner>,
pub feature_gate: FeatureGate,
enable_forwarding: bool,
retry_interval: Duration,
}

impl Client {
Expand Down Expand Up @@ -219,14 +220,14 @@ impl Client {
pending_heartbeat: Arc::default(),
pending_buckets: Arc::default(),
last_try_reconnect: Instant::now(),
bo: ExponentialBackoff::new(retry_interval),
tso,
meta_storage,
rg_sender: Either::Left(Some(rg_sender)),
rg_resp: Some(rg_rx),
}),
feature_gate: FeatureGate::default(),
enable_forwarding,
retry_interval,
}
}

Expand Down Expand Up @@ -363,17 +364,15 @@ impl Client {
/// Note: Retrying too quickly will return an error due to cancellation.
/// Please always try to reconnect after sending the request first.
pub async fn reconnect(&self, force: bool) -> Result<()> {
PD_RECONNECT_COUNTER_VEC.with_label_values(&["try"]).inc();
PD_RECONNECT_COUNTER_VEC.try_connect.inc();
let start = Instant::now();

let future = {
let inner = self.inner.rl();
if start.saturating_duration_since(inner.last_try_reconnect) < self.retry_interval {
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
// Avoid unnecessary updating.
// Prevent a large number of reconnections in a short time.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
let connector = PdConnector::new(inner.env.clone(), inner.security_mgr.clone());
Expand All @@ -394,36 +393,38 @@ impl Client {

{
let mut inner = self.inner.wl();
if start.saturating_duration_since(inner.last_try_reconnect) < self.retry_interval {
if start.saturating_duration_since(inner.last_try_reconnect) < inner.bo.get_interval() {
// There may be multiple reconnections that pass the read lock at the same time.
// Check again in the write lock to avoid unnecessary updating.
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["cancel"])
.inc();
PD_RECONNECT_COUNTER_VEC.cancel.inc();
return Err(box_err!("cancel reconnection due to too small interval"));
}
inner.last_try_reconnect = start;
inner.bo.next_backoff();
}

slow_log!(start.saturating_elapsed(), "try reconnect pd");
let (client, target_info, members, tso) = match future.await {
Err(e) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["failure"])
.inc();
PD_RECONNECT_COUNTER_VEC.failure.inc();
return Err(e);
}
Ok(None) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["no-need"])
.inc();
return Ok(());
}
Ok(Some(tuple)) => {
PD_RECONNECT_COUNTER_VEC
.with_label_values(&["success"])
.inc();
tuple
Ok(res) => {
// Reset the retry count.
{
let mut inner = self.inner.wl();
inner.bo.reset()
}
match res {
None => {
PD_RECONNECT_COUNTER_VEC.no_need.inc();
return Ok(());
}
Some(tuple) => {
PD_RECONNECT_COUNTER_VEC.success.inc();
tuple
}
}
}
};

Expand Down Expand Up @@ -900,6 +901,33 @@ impl PdConnector {
}
}

/// Simple backoff strategy.
struct ExponentialBackoff {
base: Duration,
interval: Duration,
}

impl ExponentialBackoff {
pub fn new(base: Duration) -> Self {
Self {
base,
interval: base,
}
}
pub fn next_backoff(&mut self) -> Duration {
self.interval = std::cmp::min(self.interval * 2, MAX_BACKOFF);
self.interval
}

pub fn get_interval(&self) -> Duration {
self.interval
}

pub fn reset(&mut self) {
self.interval = self.base;
}
}

pub fn trim_http_prefix(s: &str) -> &str {
s.trim_start_matches("http://")
.trim_start_matches("https://")
Expand Down Expand Up @@ -1045,8 +1073,11 @@ pub fn merge_bucket_stats<C: AsRef<[u8]>, I: AsRef<[u8]>>(
mod test {
use kvproto::metapb::BucketStats;

use super::*;
use crate::{merge_bucket_stats, util::find_bucket_index};

const BASE_BACKOFF: Duration = Duration::from_millis(100);

#[test]
fn test_merge_bucket_stats() {
#[allow(clippy::type_complexity)]
Expand Down Expand Up @@ -1162,4 +1193,23 @@ mod test {
assert_eq!(find_bucket_index(b"k7", &keys), Some(4));
assert_eq!(find_bucket_index(b"k8", &keys), Some(4));
}

#[test]
fn test_exponential_backoff() {
let mut backoff = ExponentialBackoff::new(BASE_BACKOFF);
assert_eq!(backoff.get_interval(), BASE_BACKOFF);

assert_eq!(backoff.next_backoff(), 2 * BASE_BACKOFF);
assert_eq!(backoff.next_backoff(), Duration::from_millis(400));
assert_eq!(backoff.get_interval(), Duration::from_millis(400));

// Should not exceed MAX_BACKOFF
for _ in 0..20 {
backoff.next_backoff();
}
assert_eq!(backoff.get_interval(), MAX_BACKOFF);

backoff.reset();
assert_eq!(backoff.get_interval(), BASE_BACKOFF);
}
}

0 comments on commit 4b3e33e

Please sign in to comment.