Skip to content

Commit

Permalink
pd_client: reduce store heartbeat retires to prevent heartbeat storm (#…
Browse files Browse the repository at this point in the history
…15191)

ref tikv/pd#6556, close #15184

The store heartbeat will report periodically, no need to do retires
- do not retry the store heartbeat
- change `remain_reconnect_count` as `remain_request_count`
- fix some metrics

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Jul 28, 2023
1 parent cf255e3 commit e823877
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
18 changes: 11 additions & 7 deletions components/pd_client/src/client.rs
Expand Up @@ -325,6 +325,8 @@ impl fmt::Debug for RpcClient {
}

const LEADER_CHANGE_RETRY: usize = 10;
// periodic request like store_heartbeat, we don't need to retry.
const NO_RETRY: usize = 1;

impl PdClient for RpcClient {
fn store_global_config(
Expand Down Expand Up @@ -870,10 +872,14 @@ impl PdClient for RpcClient {
})
};
Box::pin(async move {
let resp = handler.await?;
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
let resp = handler
.map(|res| {
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
res
})
.await?;
check_resp_header(resp.get_header())?;
match feature_gate.set_version(resp.get_cluster_version()) {
Err(_) => warn!("invalid cluster version: {}", resp.get_cluster_version()),
Expand All @@ -884,9 +890,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_batch_split(&self, regions: Vec<metapb::Region>) -> PdFuture<()> {
Expand Down
1 change: 1 addition & 0 deletions components/pd_client/src/metrics.rs
Expand Up @@ -33,6 +33,7 @@ make_static_metric! {
store_heartbeat,
tso,
scan_regions,
get_members,

meta_storage_put,
meta_storage_get,
Expand Down
32 changes: 20 additions & 12 deletions components/pd_client/src/util.rs
Expand Up @@ -312,7 +312,7 @@ impl Client {
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
Request {
remain_reconnect_count: retry,
remain_request_count: retry,
request_sent: 0,
client: self.clone(),
req,
Expand Down Expand Up @@ -404,7 +404,7 @@ impl Client {

/// The context of sending requets.
pub struct Request<Req, F> {
remain_reconnect_count: usize,
remain_request_count: usize,
request_sent: usize,
client: Arc<Client>,
req: Req,
Expand All @@ -419,15 +419,11 @@ where
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
async fn reconnect_if_needed(&mut self) -> Result<()> {
debug!("reconnecting ..."; "remain" => self.remain_reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
debug!("reconnecting ..."; "remain" => self.remain_request_count);
if self.request_sent < MAX_REQUEST_COUNT && self.request_sent < self.remain_request_count {
return Ok(());
}
if self.remain_reconnect_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
// Updating client.
self.remain_reconnect_count -= 1;
// FIXME: should not block the core.
debug!("(re)connecting PD client");
match self.client.reconnect(true).await {
Expand All @@ -447,18 +443,22 @@ where
}

async fn send_and_receive(&mut self) -> Result<Resp> {
if self.remain_request_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
self.request_sent += 1;
self.remain_request_count -= 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();
(self.func)(&self.client, r).await
}

fn should_not_retry(resp: &Result<Resp>) -> bool {
fn should_not_retry(&self, resp: &Result<Resp>) -> bool {
match resp {
Ok(_) => true,
Err(err) => {
// these errors are not caused by network, no need to retry
if err.retryable() {
if err.retryable() && self.remain_request_count > 0 {
error!(?*err; "request failed, retry");
false
} else {
Expand All @@ -475,7 +475,7 @@ where
loop {
{
let resp = self.send_and_receive().await;
if Self::should_not_retry(&resp) {
if self.should_not_retry(&resp) {
return resp;
}
}
Expand Down Expand Up @@ -621,10 +621,14 @@ impl PdConnector {
});
let client = PdClientStub::new(channel.clone());
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let timer = Instant::now();
let response = client
.get_members_async_opt(&GetMembersRequest::default(), option)
.unwrap_or_else(|e| panic!("fail to request PD {} err {:?}", "get_members", e))
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
Expand Down Expand Up @@ -789,7 +793,7 @@ impl PdConnector {
Ok((None, has_network_error))
}

pub async fn reconnect_leader(
async fn reconnect_leader(
&self,
leader: &Member,
) -> Result<(Option<(PdClientStub, String)>, bool)> {
Expand Down Expand Up @@ -835,6 +839,7 @@ impl PdConnector {
let client_urls = leader.get_client_urls();
for leader_url in client_urls {
let target = TargetInfo::new(leader_url.clone(), &ep);
let timer = Instant::now();
let response = client
.get_members_async_opt(
&GetMembersRequest::default(),
Expand All @@ -846,6 +851,9 @@ impl PdConnector {
panic!("fail to request PD {} err {:?}", "get_members", e)
})
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(_) => return Ok(Some((client, target))),
Err(_) => continue,
Expand Down

0 comments on commit e823877

Please sign in to comment.