diff --git a/src/pd/client.rs b/src/pd/client.rs index 6dc786a25fc..0a4610cfe33 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -13,17 +13,18 @@ use std::fmt; use std::sync::{Arc, RwLock}; +use std::time::Duration; use protobuf::RepeatedField; use futures::{future, Future, Sink, Stream}; use futures::sync::mpsc::{self, UnboundedSender}; -use grpc::{EnvBuilder, WriteFlags}; +use grpc::{CallOption, EnvBuilder, WriteFlags}; use kvproto::metapb; use kvproto::pdpb::{self, Member}; use util::{Either, HandyRwLock}; use pd::PdFuture; -use super::{Error, PdClient, RegionStat, Result}; +use super::{Error, PdClient, RegionStat, Result, REQUEST_TIMEOUT}; use super::util::{check_resp_header, sync_request, validate_endpoints, Inner, LeaderClient}; const CQ_COUNT: usize = 1; @@ -86,7 +87,10 @@ impl PdClient for RpcClient { let resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.bootstrap(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.bootstrap_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); Ok(()) @@ -99,7 +103,10 @@ impl PdClient for RpcClient { let resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.is_bootstrapped(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.is_bootstrapped_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -113,7 +120,10 @@ impl PdClient for RpcClient { let resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.alloc_id(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.alloc_id_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -128,7 +138,10 @@ impl PdClient for RpcClient { let resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.put_store(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.put_store_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -143,7 +156,10 @@ impl PdClient for RpcClient { let mut resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.get_store(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.get_store_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -157,7 +173,10 @@ impl PdClient for RpcClient { let mut resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.get_cluster_config(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.get_cluster_config_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -172,7 +191,10 @@ impl PdClient for RpcClient { let mut resp = try!(sync_request( &self.leader_client, LEADER_CHANGE_RETRY, - |client| client.get_region(req.clone()) + |client| { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + client.get_region_opt(req.clone(), option) + } )); try!(check_resp_header(resp.get_header())); @@ -185,7 +207,8 @@ impl PdClient for RpcClient { req.set_region_id(region_id); let executor = |client: &RwLock, req: pdpb::GetRegionByIDRequest| { - let handler = client.rl().client.get_region_by_id_async(req); + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + let handler = client.rl().client.get_region_by_id_async_opt(req, option); handler .map_err(Error::Grpc) .and_then(|mut resp| { @@ -268,7 +291,8 @@ impl PdClient for RpcClient { req.set_region(region); let executor = |client: &RwLock, req: pdpb::AskSplitRequest| { - let handler = client.rl().client.ask_split_async(req); + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + let handler = client.rl().client.ask_split_async_opt(req, option); handler .map_err(Error::Grpc) .and_then(|resp| { @@ -289,7 +313,8 @@ impl PdClient for RpcClient { req.set_stats(stats); let executor = |client: &RwLock, req: pdpb::StoreHeartbeatRequest| { - let handler = client.rl().client.store_heartbeat_async(req); + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + let handler = client.rl().client.store_heartbeat_async_opt(req, option); handler .map_err(Error::Grpc) .and_then(|resp| { @@ -311,7 +336,8 @@ impl PdClient for RpcClient { req.set_right(right); let executor = |client: &RwLock, req: pdpb::ReportSplitRequest| { - let handler = client.rl().client.report_split_async(req); + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + let handler = client.rl().client.report_split_async_opt(req, option); handler .map_err(Error::Grpc) .and_then(|resp| { diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 113110c3b41..d8e6f3ed64a 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -137,3 +137,5 @@ pub trait PdClient: Send + Sync { // Report pd the split region. fn report_split(&self, left: metapb::Region, right: metapb::Region) -> PdFuture<()>; } + +const REQUEST_TIMEOUT: u64 = 2; // 2s diff --git a/src/pd/util.rs b/src/pd/util.rs index 6ee19f25ba2..cc1a2cfb997 100644 --- a/src/pd/util.rs +++ b/src/pd/util.rs @@ -22,18 +22,17 @@ use futures::{task, Async, BoxFuture, Future, Poll, Stream}; use futures::task::Task; use futures::future::{loop_fn, ok, Loop}; use futures::sync::mpsc::UnboundedSender; -use grpc::{ChannelBuilder, ClientDuplexReceiver, ClientDuplexSender, Environment, +use grpc::{CallOption, ChannelBuilder, ClientDuplexReceiver, ClientDuplexSender, Environment, Result as GrpcResult}; use tokio_timer::Timer; -use rand::{self, Rng}; use kvproto::pdpb::{ErrorType, GetMembersRequest, GetMembersResponse, Member, RegionHeartbeatRequest, RegionHeartbeatResponse, ResponseHeader}; use kvproto::pdpb_grpc::PdClient; use prometheus::HistogramTimer; use util::{Either, HandyRwLock}; -use pd::{Error, PdFuture, Result}; -use pd::metrics::PD_SEND_MSG_HISTOGRAM; +use super::{Error, PdFuture, Result, REQUEST_TIMEOUT}; +use super::metrics::PD_SEND_MSG_HISTOGRAM; pub struct Inner { env: Arc, @@ -199,7 +198,7 @@ pub struct Request { timer: Option, } -const MAX_REQUEST_COUNT: usize = 5; +const MAX_REQUEST_COUNT: usize = 3; impl Request where @@ -378,7 +377,8 @@ fn connect(env: Arc, addr: &str) -> Result<(PdClient, GetMembersRes let addr = addr.trim_left_matches("http://"); let channel = ChannelBuilder::new(env).connect(addr); let client = PdClient::new(channel); - match client.get_members(GetMembersRequest::new()) { + let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)); + match client.get_members_opt(GetMembersRequest::new(), option) { Ok(resp) => Ok((client, resp)), Err(e) => Err(Error::Grpc(e)), } @@ -388,16 +388,17 @@ pub fn try_connect_leader( env: Arc, previous: &GetMembersResponse, ) -> Result<(PdClient, GetMembersResponse)> { - // Try to connect other members. - // Randomize endpoints. + let previous_leader = previous.get_leader(); let members = previous.get_members(); - let mut indexes: Vec = (0..members.len()).collect(); - rand::thread_rng().shuffle(&mut indexes); - let cluster_id = previous.get_header().get_cluster_id(); let mut resp = None; - 'outer: for i in indexes { - for ep in members[i].get_client_urls() { + // Try to connect to other members, then the previous leader. + 'outer: for m in members + .into_iter() + .filter(|m| *m != previous_leader) + .chain(&[previous_leader.clone()]) + { + for ep in m.get_client_urls() { match connect(env.clone(), ep.as_str()) { Ok((_, r)) => { let new_cluster_id = r.get_header().get_cluster_id();