Skip to content

Commit

Permalink
pd-client: pd client should update if the grpc stream sender failed. (#…
Browse files Browse the repository at this point in the history
…13094) (#13137)

close #12934, ref #13094

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: buffer <1045931706@qq.com>
  • Loading branch information
ti-srebot and bufferflies committed Jul 27, 2022
1 parent 7dedb3a commit 1cb47d2
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 3 deletions.
1 change: 1 addition & 0 deletions components/error_code/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ define_error_codes!(
CLUSTER_NOT_BOOTSTRAPPED => ("ClusterNotBootstraped", "", ""),
INCOMPATIBLE => ("Imcompatible", "", ""),
GRPC => ("gRPC", "", ""),
STREAM_DISCONNECT => ("StreamDisconnect","",""),
REGION_NOT_FOUND => ("RegionNotFound", "", ""),
STORE_TOMBSTONE => ("StoreTombstone", "", ""),
GLOBAL_CONFIG_NOT_FOUND => ("GlobalConfigNotFound","",""),
Expand Down
8 changes: 6 additions & 2 deletions components/pd_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ impl PdClient for RpcClient {
if last > last_report {
last_report = last - 1;
}
fail::fail_point!("region_heartbeat_send_failed", |_| {
Err(Error::Grpc(grpcio::Error::RemoteStopped))
});
Ok((r, WriteFlags::default()))
}))
.await;
Expand All @@ -643,7 +646,8 @@ impl PdClient for RpcClient {
.expect("expect region heartbeat sender");
let ret = sender
.unbounded_send(req)
.map_err(|e| Error::Other(Box::new(e)));
.map_err(|e| Error::StreamDisconnect(e.into_send_error()));

Box::pin(future::ready(ret)) as PdFuture<_>
};

Expand Down Expand Up @@ -1048,7 +1052,7 @@ impl PdClient for RpcClient {
.expect("expect region buckets sender");
let ret = sender
.unbounded_send(req)
.map_err(|e| Error::Other(Box::new(e)));
.map_err(|e| Error::StreamDisconnect(e.into_send_error()));
Box::pin(future::ready(ret)) as PdFuture<_>
};

Expand Down
6 changes: 5 additions & 1 deletion components/pd_client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::{error, result};

use error_code::{self, ErrorCode, ErrorCodeExt};
use futures::channel::mpsc::SendError;
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -15,6 +16,8 @@ pub enum Error {
Incompatible,
#[error("{0}")]
Grpc(#[from] grpcio::Error),
#[error("{0}")]
StreamDisconnect(#[from] SendError),
#[error("unknown error {0:?}")]
Other(#[from] Box<dyn error::Error + Sync + Send>),
#[error("region is not found for key {}", log_wrappers::Value::key(.0))]
Expand All @@ -30,7 +33,7 @@ pub type Result<T> = result::Result<T, Error>;
impl Error {
pub fn retryable(&self) -> bool {
match self {
Error::Grpc(_) | Error::ClusterNotBootstrapped(_) => true,
Error::Grpc(_) | Error::ClusterNotBootstrapped(_) | Error::StreamDisconnect(_) => true,
Error::Other(_)
| Error::RegionNotFound(_)
| Error::StoreTombstone(_)
Expand All @@ -48,6 +51,7 @@ impl ErrorCodeExt for Error {
Error::ClusterNotBootstrapped(_) => error_code::pd::CLUSTER_NOT_BOOTSTRAPPED,
Error::Incompatible => error_code::pd::INCOMPATIBLE,
Error::Grpc(_) => error_code::pd::GRPC,
Error::StreamDisconnect(_) => error_code::pd::STREAM_DISCONNECT,
Error::RegionNotFound(_) => error_code::pd::REGION_NOT_FOUND,
Error::StoreTombstone(_) => error_code::pd::STORE_TOMBSTONE,
Error::GlobalConfigNotFound(_) => error_code::pd::GLOBAL_CONFIG_NOT_FOUND,
Expand Down
1 change: 1 addition & 0 deletions components/test_pd/src/mocker/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl PdMocker for Service {
.insert(region_id, req.get_leader().clone());

let mut resp = RegionHeartbeatResponse::default();
resp.set_region_id(req.get_region().get_id());
let header = Service::header();
resp.set_header(header);
Some(Ok(resp))
Expand Down
53 changes: 53 additions & 0 deletions tests/integrations/pd/test_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,59 @@ fn test_change_leader_async() {
panic!("failed, leader should changed");
}

#[test]
fn test_pd_client_heartbeat_send_failed() {
let pd_client_send_fail_fp = "region_heartbeat_send_failed";
fail::cfg(pd_client_send_fail_fp, "return()").unwrap();
let server = MockServer::with_case(1, Arc::new(AlreadyBootstrapped));
let eps = server.bind_addrs();

let client = new_client(eps, None);
let poller = Builder::new_multi_thread()
.thread_name(thd_name!("poller"))
.worker_threads(1)
.build()
.unwrap();
let (tx, rx) = mpsc::channel();
let f =
client.handle_region_heartbeat_response(1, move |resp| tx.send(resp).unwrap_or_default());
poller.spawn(f);

let heartbeat_send_fail = |ok| {
let mut region = metapb::Region::default();
region.set_id(1);
poller.spawn(client.region_heartbeat(
store::RAFT_INIT_LOG_TERM,
region,
metapb::Peer::default(),
RegionStat::default(),
None,
));
let rsp = rx.recv_timeout(Duration::from_millis(100));
if ok {
assert!(rsp.is_ok());
assert_eq!(rsp.unwrap().get_region_id(), 1);
} else {
assert!(rsp.is_err());
}

let region = block_on(client.get_region_by_id(1));
if ok {
assert!(region.is_ok());
let r = region.unwrap();
assert!(r.is_some());
assert_eq!(1, r.unwrap().get_id());
} else {
assert!(region.is_err());
}
};
// send fail if network is block.
heartbeat_send_fail(false);
fail::remove(pd_client_send_fail_fp);
// send success after network recovered.
heartbeat_send_fail(true);
}

#[test]
fn test_region_heartbeat_on_leader_change() {
let eps_count = 3;
Expand Down

0 comments on commit 1cb47d2

Please sign in to comment.