Skip to content

Commit

Permalink
pd_client: reduce store heartbeat retires to prevent heartbeat storm (#…
Browse files Browse the repository at this point in the history
…15191) (#15231)

ref tikv/pd#6556, close #15184

The store heartbeat will report periodically, no need to do retires
- do not retry the store heartbeat
- change `remain_reconnect_count` as `remain_request_count`
- fix some metrics

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: nolouch <nolouch@gmail.com>
  • Loading branch information
ti-chi-bot and nolouch committed Aug 14, 2023
1 parent 984ad89 commit c5e0fb9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
18 changes: 11 additions & 7 deletions components/pd_client/src/client.rs
Expand Up @@ -285,6 +285,8 @@ impl fmt::Debug for RpcClient {
}

const LEADER_CHANGE_RETRY: usize = 10;
// periodic request like store_heartbeat, we don't need to retry.
const NO_RETRY: usize = 1;

impl PdClient for RpcClient {
fn load_global_config(&self, list: Vec<String>) -> PdFuture<HashMap<String, String>> {
Expand Down Expand Up @@ -832,10 +834,14 @@ impl PdClient for RpcClient {
})
};
Box::pin(async move {
let resp = handler.await?;
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["store_heartbeat"])
.observe(duration_to_sec(timer.saturating_elapsed()));
let resp = handler
.map(|res| {
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["store_heartbeat"])
.observe(timer.saturating_elapsed_secs());
res
})
.await?;
check_resp_header(resp.get_header())?;
match feature_gate.set_version(resp.get_cluster_version()) {
Err(_) => warn!("invalid cluster version: {}", resp.get_cluster_version()),
Expand All @@ -846,9 +852,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_batch_split(&self, regions: Vec<metapb::Region>) -> PdFuture<()> {
Expand Down
32 changes: 20 additions & 12 deletions components/pd_client/src/util.rs
Expand Up @@ -304,7 +304,7 @@ impl Client {
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
Request {
remain_reconnect_count: retry,
remain_request_count: retry,
request_sent: 0,
client: self.clone(),
req,
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Client {

/// The context of sending requets.
pub struct Request<Req, F> {
remain_reconnect_count: usize,
remain_request_count: usize,
request_sent: usize,
client: Arc<Client>,
req: Req,
Expand All @@ -413,15 +413,11 @@ where
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
async fn reconnect_if_needed(&mut self) -> Result<()> {
debug!("reconnecting ..."; "remain" => self.remain_reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
debug!("reconnecting ..."; "remain" => self.remain_request_count);
if self.request_sent < MAX_REQUEST_COUNT && self.request_sent < self.remain_request_count {
return Ok(());
}
if self.remain_reconnect_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
// Updating client.
self.remain_reconnect_count -= 1;
// FIXME: should not block the core.
debug!("(re)connecting PD client");
match self.client.reconnect(true).await {
Expand All @@ -441,18 +437,22 @@ where
}

async fn send_and_receive(&mut self) -> Result<Resp> {
if self.remain_request_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
self.request_sent += 1;
self.remain_request_count -= 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();
(self.func)(&self.client, r).await
}

fn should_not_retry(resp: &Result<Resp>) -> bool {
fn should_not_retry(&self, resp: &Result<Resp>) -> bool {
match resp {
Ok(_) => true,
Err(err) => {
// these errors are not caused by network, no need to retry
if err.retryable() {
if err.retryable() && self.remain_request_count > 0 {
error!(?*err; "request failed, retry");
false
} else {
Expand All @@ -469,7 +469,7 @@ where
loop {
{
let resp = self.send_and_receive().await;
if Self::should_not_retry(&resp) {
if self.should_not_retry(&resp) {
return resp;
}
}
Expand Down Expand Up @@ -615,10 +615,14 @@ impl PdConnector {
});
let client = PdClientStub::new(channel.clone());
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let timer = Instant::now();
let response = client
.get_members_async_opt(&GetMembersRequest::default(), option)
.unwrap_or_else(|e| panic!("fail to request PD {} err {:?}", "get_members", e))
.await;
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_members"])
.observe(timer.saturating_elapsed_secs());
match response {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
Expand Down Expand Up @@ -783,7 +787,7 @@ impl PdConnector {
Ok((None, has_network_error))
}

pub async fn reconnect_leader(
async fn reconnect_leader(
&self,
leader: &Member,
) -> Result<(Option<(PdClientStub, String)>, bool)> {
Expand Down Expand Up @@ -829,6 +833,7 @@ impl PdConnector {
let client_urls = leader.get_client_urls();
for leader_url in client_urls {
let target = TargetInfo::new(leader_url.clone(), &ep);
let timer = Instant::now();
let response = client
.get_members_async_opt(
&GetMembersRequest::default(),
Expand All @@ -840,6 +845,9 @@ impl PdConnector {
panic!("fail to request PD {} err {:?}", "get_members", e)
})
.await;
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_members"])
.observe(timer.saturating_elapsed_secs());
match response {
Ok(_) => return Ok(Some((client, target))),
Err(_) => continue,
Expand Down

0 comments on commit c5e0fb9

Please sign in to comment.