Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191) #15230

Open
wants to merge 1 commit into
base: release-6.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 14 additions & 3 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ impl fmt::Debug for RpcClient {
}

const LEADER_CHANGE_RETRY: usize = 10;
// periodic request like store_heartbeat, we don't need to retry.
const NO_RETRY: usize = 1;

impl PdClient for RpcClient {
fn load_global_config(&self, list: Vec<String>) -> PdFuture<HashMap<String, String>> {
Expand Down Expand Up @@ -759,10 +761,21 @@ impl PdClient for RpcClient {
})
};
Box::pin(async move {
<<<<<<< HEAD
let resp = handler.await?;
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["store_heartbeat"])
.observe(duration_to_sec(timer.saturating_elapsed()));
=======
let resp = handler
.map(|res| {
PD_REQUEST_HISTOGRAM_VEC
.store_heartbeat
.observe(timer.saturating_elapsed_secs());
res
})
.await?;
>>>>>>> e8238777ea (pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191))
check_resp_header(resp.get_header())?;
match feature_gate.set_version(resp.get_cluster_version()) {
Err(_) => warn!("invalid cluster version: {}", resp.get_cluster_version()),
Expand All @@ -773,9 +786,7 @@ impl PdClient for RpcClient {
}) as PdFuture<_>
};

self.pd_client
.request(req, executor, LEADER_CHANGE_RETRY)
.execute()
self.pd_client.request(req, executor, NO_RETRY).execute()
}

fn report_batch_split(&self, regions: Vec<metapb::Region>) -> PdFuture<()> {
Expand Down
45 changes: 45 additions & 0 deletions components/pd_client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,51 @@

use lazy_static::lazy_static;
use prometheus::*;
<<<<<<< HEAD
=======
use prometheus_static_metric::{make_static_metric, register_static_histogram_vec};

make_static_metric! {
pub label_enum PDRequestEventType {
get_region,
get_region_by_id,
get_region_leader_by_id,
scatter_region,
get_store,
get_store_async,
put_store,
get_all_stores,
get_store_and_stats,
store_global_config,
load_global_config,
watch_global_config,
bootstrap_cluster,
is_cluster_bootstrapped,
get_cluster_config,
ask_split,
ask_batch_split,
report_batch_split,
get_gc_safe_point,
update_service_safe_point,
min_resolved_ts,
get_operator,
alloc_id,
is_recovering_marked,
store_heartbeat,
tso,
scan_regions,
get_members,

meta_storage_put,
meta_storage_get,
meta_storage_watch,
}

pub struct PDRequestEventHistogramVec: Histogram {
"type" => PDRequestEventType,
}
}
>>>>>>> e8238777ea (pd_client: reduce store heartbeat retires to prevent heartbeat storm (#15191))

lazy_static! {
pub static ref PD_REQUEST_HISTOGRAM_VEC: HistogramVec = register_histogram_vec!(
Expand Down
32 changes: 20 additions & 12 deletions components/pd_client/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl Client {
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
Request {
remain_reconnect_count: retry,
remain_request_count: retry,
request_sent: 0,
client: self.clone(),
req,
Expand Down Expand Up @@ -389,7 +389,7 @@ impl Client {

/// The context of sending requets.
pub struct Request<Req, F> {
remain_reconnect_count: usize,
remain_request_count: usize,
request_sent: usize,
client: Arc<Client>,
req: Req,
Expand All @@ -404,15 +404,11 @@ where
F: FnMut(&Client, Req) -> PdFuture<Resp> + Send + 'static,
{
async fn reconnect_if_needed(&mut self) -> Result<()> {
debug!("reconnecting ..."; "remain" => self.remain_reconnect_count);
if self.request_sent < MAX_REQUEST_COUNT {
debug!("reconnecting ..."; "remain" => self.remain_request_count);
if self.request_sent < MAX_REQUEST_COUNT && self.request_sent < self.remain_request_count {
return Ok(());
}
if self.remain_reconnect_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
// Updating client.
self.remain_reconnect_count -= 1;
// FIXME: should not block the core.
debug!("(re)connecting PD client");
match self.client.reconnect(true).await {
Expand All @@ -432,18 +428,22 @@ where
}

async fn send_and_receive(&mut self) -> Result<Resp> {
if self.remain_request_count == 0 {
return Err(box_err!("request retry exceeds limit"));
}
self.request_sent += 1;
self.remain_request_count -= 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();
(self.func)(&self.client, r).await
}

fn should_not_retry(resp: &Result<Resp>) -> bool {
fn should_not_retry(&self, resp: &Result<Resp>) -> bool {
match resp {
Ok(_) => true,
Err(err) => {
// these errors are not caused by network, no need to retry
if err.retryable() {
if err.retryable() && self.remain_request_count > 0 {
error!(?*err; "request failed, retry");
false
} else {
Expand All @@ -460,7 +460,7 @@ where
loop {
{
let resp = self.send_and_receive().await;
if Self::should_not_retry(&resp) {
if self.should_not_retry(&resp) {
return resp;
}
}
Expand Down Expand Up @@ -602,10 +602,14 @@ impl PdConnector {
});
let client = PdClientStub::new(channel);
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let timer = Instant::now();
let response = client
.get_members_async_opt(&GetMembersRequest::default(), option)
.unwrap_or_else(|e| panic!("fail to request PD {} err {:?}", "get_members", e))
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(resp) => Ok((client, resp)),
Err(e) => Err(Error::Grpc(e)),
Expand Down Expand Up @@ -759,7 +763,7 @@ impl PdConnector {
Ok((None, has_network_error))
}

pub async fn reconnect_leader(
async fn reconnect_leader(
&self,
leader: &Member,
) -> Result<(Option<(PdClientStub, String)>, bool)> {
Expand Down Expand Up @@ -803,6 +807,7 @@ impl PdConnector {
let client_urls = leader.get_client_urls();
for leader_url in client_urls {
let target = TargetInfo::new(leader_url.clone(), &ep);
let timer = Instant::now();
let response = client
.get_members_async_opt(
&GetMembersRequest::default(),
Expand All @@ -814,6 +819,9 @@ impl PdConnector {
panic!("fail to request PD {} err {:?}", "get_members", e)
})
.await;
PD_REQUEST_HISTOGRAM_VEC
.get_members
.observe(timer.saturating_elapsed_secs());
match response {
Ok(_) => return Ok(Some((client, target))),
Err(_) => continue,
Expand Down