From 25559eb840d339dcf594e4db6f189f0b4141a082 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 3 Oct 2018 12:25:31 +0800 Subject: [PATCH 1/5] WIP: TSO happy path Signed-off-by: Xiaoguang Sun --- examples/pd-client.rs | 4 + src/pd/client.rs | 59 ++------ src/pd/{util.rs => leader.rs} | 246 ++++++++++++++++++++++++++++------ src/pd/mod.rs | 9 +- 4 files changed, 230 insertions(+), 88 deletions(-) rename src/pd/{util.rs => leader.rs} (59%) diff --git a/examples/pd-client.rs b/examples/pd-client.rs index a96d5f34..18b3130e 100644 --- a/examples/pd-client.rs +++ b/examples/pd-client.rs @@ -31,4 +31,8 @@ fn main() { println!("Store: {:?}", store); let region = pd_client.get_region(b"abc").wait(); println!("Region: {:?}", region); + for _ in 0..10 { + let tso = pd_client.get_ts().wait(); + println!("TSO: {:?}", tso); + } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 00a6e5f5..691b2ae4 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -15,13 +15,12 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use futures::sync::oneshot; use futures::Future; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; -use kvproto::pdpb::{self, Member}; +use kvproto::pdpb::{GetStoreRequest, GetRegionByIDRequest, GetRegionRequest, Member}; -use super::util::{check_resp_header, validate_endpoints, LeaderClient, Request}; +use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request}; use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; use pd::{PdFuture, PdTimestamp}; use util::security::SecurityManager; @@ -45,16 +44,6 @@ pub struct PdRpcClient { leader: Arc>, } -macro_rules! request { - ($cluster_id:expr, $type:ty) => {{ - let mut request = <$type>::new(); - let mut header = pdpb::RequestHeader::new(); - header.set_cluster_id($cluster_id); - request.set_header(header); - request - }}; -} - impl PdRpcClient { pub fn new(endpoints: &[&str], security_mgr: Arc) -> Result { let env = Arc::new( @@ -67,12 +56,7 @@ impl PdRpcClient { Ok(PdRpcClient { cluster_id: members.get_header().get_cluster_id(), - leader: Arc::new(RwLock::new(LeaderClient::new( - env, - security_mgr, - client, - members, - ))), + leader: LeaderClient::new(env, security_mgr, client, members), }) } @@ -91,10 +75,10 @@ impl PdRpcClient { ) -> impl Future), Error = Error> { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::GetRegionRequest); + let mut req = request!(self.cluster_id, GetRegionRequest); req.set_region_key(key.to_owned()); - let executor = move |client: &RwLock, req: pdpb::GetRegionRequest| { + let executor = move |client: &RwLock, req: GetRegionRequest| { let receiver = client .rl() .client @@ -125,10 +109,10 @@ impl PdRpcClient { fn get_store_async(&self, store_id: u64) -> impl Future { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::GetStoreRequest); + let mut req = request!(self.cluster_id, GetStoreRequest); req.set_store_id(store_id); - let executor = move |client: &RwLock, req: pdpb::GetStoreRequest| { + let executor = move |client: &RwLock, req: GetStoreRequest| { let receiver = client .rl() .client @@ -150,27 +134,6 @@ impl PdRpcClient { Ok(self.cluster_id) } - pub fn get_ts(&self) -> Result { - self.get_ts_async().wait() - } - - pub fn get_ts_async(&self) -> PdFuture { - let timer = Instant::now(); - - let mut req = request!(self.cluster_id, pdpb::TsoRequest); - req.set_count(1); - - let (tx, rx) = oneshot::channel::(); - let leader = self.leader.wl(); - leader.tso_requests_sender.unbounded_send(tx).unwrap(); - Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_ts"]) - .observe(duration_to_sec(timer.elapsed())); - Ok(ts) - })) - } - pub fn on_reconnect(&self, f: Box) { let mut leader = self.leader.wl(); leader.on_reconnect = Some(f); @@ -213,10 +176,10 @@ impl PdClient for PdRpcClient { fn get_region_by_id(&self, region_id: u64) -> PdFuture> { let timer = Instant::now(); - let mut req = request!(self.cluster_id, pdpb::GetRegionByIDRequest); + let mut req = request!(self.cluster_id, GetRegionByIDRequest); req.set_region_id(region_id); - let executor = move |client: &RwLock, req: pdpb::GetRegionByIDRequest| { + let executor = move |client: &RwLock, req: GetRegionByIDRequest| { let handler = client .rl() .client @@ -237,6 +200,10 @@ impl PdClient for PdRpcClient { self.request(req, executor, LEADER_CHANGE_RETRY).execute() } + + fn get_ts(&self) -> PdFuture { + self.leader.wl().get_ts() + } } impl fmt::Debug for PdRpcClient { diff --git a/src/pd/util.rs b/src/pd/leader.rs similarity index 59% rename from src/pd/util.rs rename to src/pd/leader.rs index 26d9c6be..9b03909e 100644 --- a/src/pd/util.rs +++ b/src/pd/leader.rs @@ -1,26 +1,27 @@ use fxhash::FxHashSet as HashSet; use std::result; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; +use std::thread::{self, JoinHandle}; use std::time::Duration; use std::time::Instant; use futures::future::{loop_fn, ok, Loop}; use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot; -use futures::task::Task; -use futures::Future; -use grpc::{CallOption, ChannelBuilder, ClientDuplexReceiver, ClientDuplexSender, Environment}; +use futures::{Future, Sink, Stream}; +use grpc::{CallOption, ChannelBuilder, Environment, WriteFlags}; use kvproto::pdpb::{ GetMembersRequest, GetMembersResponse, ResponseHeader, TsoRequest, TsoResponse, }; use kvproto::pdpb_grpc::PdClient; use tokio_timer::timer::Handle; -use super::{Error, PdFuture, PdTimestamp, Result, REQUEST_TIMEOUT}; +use super::{Error, PdFuture, PdTimestamp, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; +use tokio_core::reactor::{Core, Handle as OtherHandle}; use util::security::SecurityManager; +use util::time::duration_to_sec; use util::timer::GLOBAL_TIMER_HANDLE; -use util::{Either, HandyRwLock}; +use util::HandyRwLock; macro_rules! box_err { ($e:expr) => ({ @@ -33,19 +34,196 @@ macro_rules! box_err { }); } +macro_rules! request { + ($cluster_id:expr, $type:ty) => {{ + let mut request = <$type>::new(); + let mut header = ::kvproto::pdpb::RequestHeader::new(); + header.set_cluster_id($cluster_id); + request.set_header(header); + request + }}; +} + +type TsoChannel = oneshot::Sender; + +pub enum TsoTask { + Init, + Request, + Response(Vec>, TsoResponse), +} + +struct TsoDispatcher { + task_tx: Option>>, + rpc_tx: UnboundedSender, + rpc_rx: Option>, + + handle: Option>, + pending: Option>, + buffer: Option>, + batch: Vec, +} + +impl Drop for TsoDispatcher { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.join().unwrap(); + } + } +} + +impl TsoDispatcher { + fn new() -> TsoDispatcher { + let (rpc_tx, rpc_rx) = unbounded(); + TsoDispatcher { + task_tx: None, + rpc_tx, + rpc_rx: Some(rpc_rx), + handle: None, + buffer: Some(Vec::with_capacity(8)), + batch: Vec::with_capacity(8), + pending: None, + } + } + + fn start(&mut self, client: Arc>) { + if self.handle.is_none() { + info!("starting tso dispatcher thread"); + let (task_tx, task_rx) = unbounded(); + task_tx.unbounded_send(Some(TsoTask::Init)).unwrap(); + self.task_tx = Some(task_tx); + self.handle = Some( + thread::Builder::new() + .name("dispatcher thread".to_owned()) + .spawn(move || Self::poll(&client, task_rx)) + .unwrap(), + ) + } else { + warn!("tso sender and receiver are stale, refreshing.."); + let (rpc_tx, rpc_rx) = unbounded(); + self.rpc_tx = rpc_tx; + self.rpc_rx = Some(rpc_rx); + self.schedule(TsoTask::Init); + } + } + + fn schedule(&self, task: TsoTask) { + self.task_tx + .as_ref() + .unwrap() + .unbounded_send(Some(task)) + .unwrap(); + } + + fn poll(client: &Arc>, rx: UnboundedReceiver>) { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + { + let f = rx.take_while(|t| Ok(t.is_some())).for_each(|t| { + Self::dispatch(&client, t.unwrap(), &handle); + Ok(()) + }); + core.run(f).unwrap(); + } + } + + fn init(client: &Arc>, handle: &OtherHandle) { + let client = Arc::clone(client); + let (tx, rx) = client.wl().client.tso().unwrap(); + let rpc_rx = client.wl().tso.rpc_rx.take().unwrap(); + handle.spawn( + tx.sink_map_err(Error::Grpc) + .send_all(rpc_rx.then(|r| match r { + Ok(r) => Ok((r, WriteFlags::default())), + Err(()) => Err(Error::Other(box_err!("failed to recv tso requests"))), + })).then(|r| match r { + Ok((mut sender, _)) => { + sender.get_mut().cancel(); + Ok(()) + } + Err(e) => { + error!("failed to send tso requests: {:?}", e); + Err(()) + } + }), + ); + handle.spawn( + rx.for_each(move |resp| { + let mut client = client.wl(); + let tso = &mut client.tso; + let pending = tso.pending.take().unwrap(); + tso.schedule(TsoTask::Response(pending, resp)); + if !tso.batch.is_empty() { + /* schedule another batch of request */ + tso.schedule(TsoTask::Request); + } + Ok(()) + }).map_err(|e| panic!("unexpected error: {:?}", e)), + ); + } + + fn request(client: &Arc>) { + let mut client = client.wl(); + let cluster_id = client.cluster_id; + let tso = &mut client.tso; + let mut batch = tso.buffer.take().unwrap(); + batch.extend(tso.batch.drain(..)); + let mut request = request!(cluster_id, TsoRequest); + request.set_count(batch.len() as u32); + tso.pending = Some(batch); + tso.rpc_tx.unbounded_send(request).unwrap(); + } + + fn response( + client: &Arc>, + mut requests: Vec, + response: &TsoResponse, + ) { + let timestamp = response.get_timestamp(); + for (offset, request) in requests.drain(..).enumerate() { + request + .send(PdTimestamp { + physical: timestamp.physical, + logical: timestamp.logical + offset as i64, + }).unwrap(); + } + client.wl().tso.buffer = Some(requests); + } + + fn dispatch(client: &Arc>, task: TsoTask, handle: &OtherHandle) { + match task { + TsoTask::Request => Self::request(client), + TsoTask::Response(requests, response) => Self::response(client, requests, &response), + TsoTask::Init => Self::init(client, handle), + } + } + + fn get_ts(&mut self) -> PdFuture { + let timer = Instant::now(); + let (tx, rx) = oneshot::channel::(); + self.batch.push(tx); + if self.pending.is_none() { + /* schedule tso request to run */ + self.schedule(TsoTask::Request); + } + Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_ts"]) + .observe(duration_to_sec(timer.elapsed())); + Ok(ts) + })) + } +} + pub struct LeaderClient { - env: Arc, pub client: PdClient, pub members: GetMembersResponse, - security_mgr: Arc, pub on_reconnect: Option>, - pub tso_sender: Either>, UnboundedSender>, - pub tso_receiver: Either>, Task>, - - pub tso_requests: UnboundedReceiver>, - pub tso_requests_sender: UnboundedSender>, + env: Arc, + cluster_id: u64, + security_mgr: Arc, last_update: Instant, + tso: TsoDispatcher, } impl LeaderClient { @@ -54,23 +232,25 @@ impl LeaderClient { security_mgr: Arc, client: PdClient, members: GetMembersResponse, - ) -> LeaderClient { - let (tx, rx) = client.tso().unwrap(); - let (tso_sender, tso_receiver) = unbounded(); - LeaderClient { + ) -> Arc> { + let cluster_id = members.get_header().get_cluster_id(); + let client = Arc::new(RwLock::new(LeaderClient { env, - tso_sender: Either::Left(Some(tx)), - tso_receiver: Either::Left(Some(rx)), client, members, security_mgr, on_reconnect: None, - last_update: Instant::now(), + tso: TsoDispatcher::new(), + cluster_id, + })); - tso_requests: tso_receiver, - tso_requests_sender: tso_sender, - } + client.wl().tso.start(Arc::clone(&client)); + client + } + + pub fn get_ts(&mut self) -> PdFuture { + self.tso.get_ts() } } @@ -337,8 +517,7 @@ pub fn check_resp_header(header: &ResponseHeader) -> Result<()> { } // Re-establish connection with PD leader in synchronized fashion. -pub fn reconnect(leader: &RwLock) -> Result<()> { - println!("try reconnect"); +pub fn reconnect(leader: &Arc>) -> Result<()> { let ((client, members), start) = { let leader = leader.rl(); if leader.last_update.elapsed() < Duration::from_secs(RECONNECT_INTERVAL_SEC) { @@ -354,26 +533,15 @@ pub fn reconnect(leader: &RwLock) -> Result<()> { }; { + let leader_clone = Arc::clone(leader); let mut leader = leader.wl(); - let (tx, rx) = client.tso().unwrap(); - warn!("tso sender and receiver are stale, refreshing.."); - - // Try to cancel an unused tso sender. - if let Either::Left(Some(ref mut r)) = leader.tso_sender { - info!("cancel tso sender"); - r.cancel(); - } - leader.tso_sender = Either::Left(Some(tx)); - if let Either::Right(ref mut task) = leader.tso_receiver { - task.notify(); - } - leader.tso_receiver = Either::Left(Some(rx)); leader.client = client; leader.members = members; leader.last_update = Instant::now(); if let Some(ref on_reconnect) = leader.on_reconnect { on_reconnect(); } + leader.tso.start(leader_clone); } warn!("updating PD client done, spent {:?}", start.elapsed()); Ok(()) diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 07afad72..97fd99e1 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -1,11 +1,10 @@ +#[macro_use] +mod leader; mod client; -mod util; pub mod errors; pub use self::client::PdRpcClient; pub use self::errors::{Error, Result}; -pub use self::util::validate_endpoints; -pub use self::util::RECONNECT_INTERVAL_SEC; use std::ops::Deref; @@ -39,6 +38,7 @@ impl Deref for RegionInfo { pub const INVALID_ID: u64 = 0; const REQUEST_TIMEOUT: u64 = 2; // 2s +#[derive(Debug)] pub struct PdTimestamp { pub physical: i64, pub logical: i64, @@ -81,4 +81,7 @@ pub trait PdClient: Send + Sync { // // Please note that this method should only be called once. fn handle_reconnect(&self, _: F) {} + + // get a timestamp from PD + fn get_ts(&self) -> PdFuture; } From 9c306e7a9a75379e445816faf777dde071832e7a Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 3 Oct 2018 23:13:21 +0800 Subject: [PATCH 2/5] WIP: Use the same reactor thread for all PD call Signed-off-by: Xiaoguang Sun --- src/lib.rs | 2 +- src/pd/client.rs | 26 ++++++--- src/pd/leader.rs | 135 +++++++++++++++++++++++------------------------ 3 files changed, 86 insertions(+), 77 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 339e14ac..22ac7925 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,6 @@ extern crate lazy_static; #[cfg(target_os = "linux")] extern crate libc; -pub mod pd; #[macro_use] pub mod util; +pub mod pd; diff --git a/src/pd/client.rs b/src/pd/client.rs index 691b2ae4..96c11909 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -18,10 +18,11 @@ use std::time::{Duration, Instant}; use futures::Future; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; -use kvproto::pdpb::{GetStoreRequest, GetRegionByIDRequest, GetRegionRequest, Member}; +use kvproto::pdpb::{GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member}; use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request}; use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; +use futures::sync::oneshot; use pd::{PdFuture, PdTimestamp}; use util::security::SecurityManager; use util::time::duration_to_sec; @@ -103,7 +104,7 @@ impl PdRpcClient { Ok((region, leader)) })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() + self.request(req, executor, LEADER_CHANGE_RETRY) } fn get_store_async(&self, store_id: u64) -> impl Future { @@ -127,7 +128,7 @@ impl PdRpcClient { })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() + self.request(req, executor, LEADER_CHANGE_RETRY) } pub fn get_cluster_id(&self) -> Result { @@ -139,13 +140,24 @@ impl PdRpcClient { leader.on_reconnect = Some(f); } - pub fn request(&self, req: Req, func: F, retry: usize) -> Request + pub fn request(&self, req: Req, func: F, retry: usize) -> PdFuture where Req: Clone + Send + 'static, - Resp: Send + 'static, + Resp: Send + fmt::Debug + 'static, F: FnMut(&RwLock, Req) -> PdFuture + Send + 'static, { - Request::new(req, func, Arc::clone(&self.leader), retry) + let future = Request::new(req, func, Arc::clone(&self.leader), retry).execute(); + let (tx, rx) = oneshot::channel(); + let future = Box::new( + future + .and_then(move |resp| { + tx.send(resp).unwrap(); + Ok(()) + }) + .map_err(|e| panic!("{}", e)) + ); + self.leader.wl().schedule(future); + Box::new(rx.map_err(Error::Canceled).and_then(Ok)) } } @@ -198,7 +210,7 @@ impl PdClient for PdRpcClient { })) as PdFuture<_> }; - self.request(req, executor, LEADER_CHANGE_RETRY).execute() + self.request(req, executor, LEADER_CHANGE_RETRY) } fn get_ts(&self) -> PdFuture { diff --git a/src/pd/leader.rs b/src/pd/leader.rs index 9b03909e..7af52095 100644 --- a/src/pd/leader.rs +++ b/src/pd/leader.rs @@ -23,17 +23,6 @@ use util::time::duration_to_sec; use util::timer::GLOBAL_TIMER_HANDLE; use util::HandyRwLock; -macro_rules! box_err { - ($e:expr) => ({ - use std::error::Error; - let e: Box = format!("[{}:{}]: {}", file!(), line!(), $e).into(); - e.into() - }); - ($f:tt, $($arg:expr),+) => ({ - box_err!(format!($f, $($arg),+)) - }); -} - macro_rules! request { ($cluster_id:expr, $type:ty) => {{ let mut request = <$type>::new(); @@ -46,24 +35,27 @@ macro_rules! request { type TsoChannel = oneshot::Sender; -pub enum TsoTask { - Init, - Request, - Response(Vec>, TsoResponse), +type PdRequest = Box + Send>; + +pub enum PdTask { + TsoInit, + TsoRequest, + TsoResponse(Vec>, TsoResponse), + Request(PdRequest), } -struct TsoDispatcher { - task_tx: Option>>, - rpc_tx: UnboundedSender, - rpc_rx: Option>, +struct PdReactor { + task_tx: Option>>, + tso_tx: UnboundedSender, + tso_rx: Option>, handle: Option>, - pending: Option>, - buffer: Option>, - batch: Vec, + tso_pending: Option>, + tso_buffer: Option>, + tso_batch: Vec, } -impl Drop for TsoDispatcher { +impl Drop for PdReactor { fn drop(&mut self) { if let Some(handle) = self.handle.take() { handle.join().unwrap(); @@ -71,25 +63,25 @@ impl Drop for TsoDispatcher { } } -impl TsoDispatcher { - fn new() -> TsoDispatcher { - let (rpc_tx, rpc_rx) = unbounded(); - TsoDispatcher { +impl PdReactor { + fn new() -> PdReactor { + let (tso_tx, tso_rx) = unbounded(); + PdReactor { task_tx: None, - rpc_tx, - rpc_rx: Some(rpc_rx), + tso_tx, + tso_rx: Some(tso_rx), handle: None, - buffer: Some(Vec::with_capacity(8)), - batch: Vec::with_capacity(8), - pending: None, + tso_buffer: Some(Vec::with_capacity(8)), + tso_batch: Vec::with_capacity(8), + tso_pending: None, } } fn start(&mut self, client: Arc>) { if self.handle.is_none() { - info!("starting tso dispatcher thread"); + info!("starting pd reactor thread"); let (task_tx, task_rx) = unbounded(); - task_tx.unbounded_send(Some(TsoTask::Init)).unwrap(); + task_tx.unbounded_send(Some(PdTask::TsoInit)).unwrap(); self.task_tx = Some(task_tx); self.handle = Some( thread::Builder::new() @@ -99,14 +91,14 @@ impl TsoDispatcher { ) } else { warn!("tso sender and receiver are stale, refreshing.."); - let (rpc_tx, rpc_rx) = unbounded(); - self.rpc_tx = rpc_tx; - self.rpc_rx = Some(rpc_rx); - self.schedule(TsoTask::Init); + let (tso_tx, tso_rx) = unbounded(); + self.tso_tx = tso_tx; + self.tso_rx = Some(tso_rx); + self.schedule(PdTask::TsoInit); } } - fn schedule(&self, task: TsoTask) { + fn schedule(&self, task: PdTask) { self.task_tx .as_ref() .unwrap() @@ -114,7 +106,7 @@ impl TsoDispatcher { .unwrap(); } - fn poll(client: &Arc>, rx: UnboundedReceiver>) { + fn poll(client: &Arc>, rx: UnboundedReceiver>) { let mut core = Core::new().unwrap(); let handle = core.handle(); { @@ -129,10 +121,10 @@ impl TsoDispatcher { fn init(client: &Arc>, handle: &OtherHandle) { let client = Arc::clone(client); let (tx, rx) = client.wl().client.tso().unwrap(); - let rpc_rx = client.wl().tso.rpc_rx.take().unwrap(); + let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); handle.spawn( tx.sink_map_err(Error::Grpc) - .send_all(rpc_rx.then(|r| match r { + .send_all(tso_rx.then(|r| match r { Ok(r) => Ok((r, WriteFlags::default())), Err(()) => Err(Error::Other(box_err!("failed to recv tso requests"))), })).then(|r| match r { @@ -149,31 +141,31 @@ impl TsoDispatcher { handle.spawn( rx.for_each(move |resp| { let mut client = client.wl(); - let tso = &mut client.tso; - let pending = tso.pending.take().unwrap(); - tso.schedule(TsoTask::Response(pending, resp)); - if !tso.batch.is_empty() { - /* schedule another batch of request */ - tso.schedule(TsoTask::Request); + let reactor = &mut client.reactor; + let tso_pending = reactor.tso_pending.take().unwrap(); + reactor.schedule(PdTask::TsoResponse(tso_pending, resp)); + if !reactor.tso_batch.is_empty() { + /* schedule another tso_batch of request */ + reactor.schedule(PdTask::TsoRequest); } Ok(()) }).map_err(|e| panic!("unexpected error: {:?}", e)), ); } - fn request(client: &Arc>) { + fn tso_request(client: &Arc>) { let mut client = client.wl(); let cluster_id = client.cluster_id; - let tso = &mut client.tso; - let mut batch = tso.buffer.take().unwrap(); - batch.extend(tso.batch.drain(..)); + let reactor = &mut client.reactor; + let mut tso_batch = reactor.tso_buffer.take().unwrap(); + tso_batch.extend(reactor.tso_batch.drain(..)); let mut request = request!(cluster_id, TsoRequest); - request.set_count(batch.len() as u32); - tso.pending = Some(batch); - tso.rpc_tx.unbounded_send(request).unwrap(); + request.set_count(tso_batch.len() as u32); + reactor.tso_pending = Some(tso_batch); + reactor.tso_tx.unbounded_send(request).unwrap(); } - fn response( + fn tso_response( client: &Arc>, mut requests: Vec, response: &TsoResponse, @@ -186,24 +178,25 @@ impl TsoDispatcher { logical: timestamp.logical + offset as i64, }).unwrap(); } - client.wl().tso.buffer = Some(requests); + client.wl().reactor.tso_buffer = Some(requests); } - fn dispatch(client: &Arc>, task: TsoTask, handle: &OtherHandle) { + fn dispatch(client: &Arc>, task: PdTask, handle: &OtherHandle) { match task { - TsoTask::Request => Self::request(client), - TsoTask::Response(requests, response) => Self::response(client, requests, &response), - TsoTask::Init => Self::init(client, handle), + PdTask::TsoRequest => Self::tso_request(client), + PdTask::TsoResponse(requests, response) => Self::tso_response(client, requests, &response), + PdTask::TsoInit => Self::init(client, handle), + PdTask::Request(task) => handle.spawn(task), } } fn get_ts(&mut self) -> PdFuture { let timer = Instant::now(); let (tx, rx) = oneshot::channel::(); - self.batch.push(tx); - if self.pending.is_none() { + self.tso_batch.push(tx); + if self.tso_pending.is_none() { /* schedule tso request to run */ - self.schedule(TsoTask::Request); + self.schedule(PdTask::TsoRequest); } Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { PD_REQUEST_HISTOGRAM_VEC @@ -223,7 +216,7 @@ pub struct LeaderClient { cluster_id: u64, security_mgr: Arc, last_update: Instant, - tso: TsoDispatcher, + reactor: PdReactor, } impl LeaderClient { @@ -241,16 +234,20 @@ impl LeaderClient { security_mgr, on_reconnect: None, last_update: Instant::now(), - tso: TsoDispatcher::new(), + reactor: PdReactor::new(), cluster_id, })); - client.wl().tso.start(Arc::clone(&client)); + client.wl().reactor.start(Arc::clone(&client)); client } pub fn get_ts(&mut self) -> PdFuture { - self.tso.get_ts() + self.reactor.get_ts() + } + + pub fn schedule(&self, task: PdRequest) { + self.reactor.schedule(PdTask::Request(task)); } } @@ -541,7 +538,7 @@ pub fn reconnect(leader: &Arc>) -> Result<()> { if let Some(ref on_reconnect) = leader.on_reconnect { on_reconnect(); } - leader.tso.start(leader_clone); + leader.reactor.start(leader_clone); } warn!("updating PD client done, spent {:?}", start.elapsed()); Ok(()) From 730eef42ffb5fda0aa931cf88f2f4b04eb14c75c Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 4 Oct 2018 09:48:07 +0800 Subject: [PATCH 3/5] Use impl Future instead of Boxed future Signed-off-by: Xiaoguang Sun --- examples/pd-client.rs | 12 +-- src/pd/client.rs | 202 +++++++++++++++++++++++++----------------- src/pd/leader.rs | 13 +-- src/pd/mod.rs | 14 +-- 4 files changed, 140 insertions(+), 101 deletions(-) diff --git a/examples/pd-client.rs b/examples/pd-client.rs index 18b3130e..1ecac7a1 100644 --- a/examples/pd-client.rs +++ b/examples/pd-client.rs @@ -7,7 +7,6 @@ extern crate tikv_client; use std::env; use std::sync::Arc; -use futures::future::Future; use simplelog::*; use tikv_client::pd::*; @@ -26,13 +25,10 @@ fn main() { let pd_client = PdRpcClient::new(&addr, Arc::clone(&security_manager)) .unwrap_or_else(|e| panic!("failed to create rpc client: {:?}", e)); - println!("Cluster ID: {}", pd_client.get_cluster_id().unwrap()); - let store = pd_client.get_store(1).wait(); - println!("Store: {:?}", store); - let region = pd_client.get_region(b"abc").wait(); - println!("Region: {:?}", region); + println!("Cluster ID: {:?}", pd_client.get_cluster_id()); + println!("Store: {:?}", pd_client.get_store(1)); + println!("Region: {:?}", pd_client.get_region(b"abc")); for _ in 0..10 { - let tso = pd_client.get_ts().wait(); - println!("TSO: {:?}", tso); + println!("TSO: {:?}", pd_client.get_ts()); } } diff --git a/src/pd/client.rs b/src/pd/client.rs index 96c11909..0a47f31d 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -15,7 +15,7 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use futures::Future; +use futures::{future, Future}; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; use kvproto::pdpb::{GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member}; @@ -61,15 +61,109 @@ impl PdRpcClient { }) } - #[inline] - fn call_option() -> CallOption { - CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)) + pub fn get_cluster_id(&self) -> Result { + Ok(self.cluster_id) + } + + pub fn on_reconnect(&self, f: Box) { + let mut leader = self.leader.wl(); + leader.on_reconnect = Some(f); + } + + pub fn get_cluster_config_async(&self) -> impl Future { + future::ok(metapb::Cluster::new()) + } + + pub fn get_all_stores_async(&self) -> impl Future, Error = Error> { + future::ok(Vec::new()) } - pub fn get_leader(&self) -> Member { + pub fn get_store_async( + &self, + store_id: u64, + ) -> impl Future { + let timer = Instant::now(); + + let mut req = request!(self.cluster_id, GetStoreRequest); + req.set_store_id(store_id); + + let executor = move |client: &RwLock, req: GetStoreRequest| { + let receiver = client + .rl() + .client + .get_store_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_store"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + Ok(resp.take_store()) + })) as PdFuture<_> + }; + + self.request(req, executor, LEADER_CHANGE_RETRY) + } + + pub fn get_region_async( + &self, + key: &[u8], + ) -> impl Future { + self.get_region_and_leader_async(key).and_then(|x| Ok(x.0)) + } + + pub fn get_region_info_async( + &self, + key: &[u8], + ) -> impl Future { + self.get_region_and_leader_async(key) + .and_then(|x| Ok(RegionInfo::new(x.0, x.1))) + } + + pub fn get_region_by_id_async( + &self, + region_id: u64, + ) -> impl Future, Error = Error> { + let timer = Instant::now(); + + let mut req = request!(self.cluster_id, GetRegionByIDRequest); + req.set_region_id(region_id); + + let executor = move |client: &RwLock, req: GetRegionByIDRequest| { + let handler = client + .rl() + .client + .get_region_by_id_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(handler.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_region_by_id"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + if resp.has_region() { + Ok(Some(resp.take_region())) + } else { + Ok(None) + } + })) as PdFuture<_> + }; + + self.request(req, executor, LEADER_CHANGE_RETRY) + } + + pub fn get_ts_async(&self) -> impl Future { + self.leader.wl().get_ts() + } + + fn get_leader(&self) -> Member { self.leader.rl().members.get_leader().clone() } + #[inline] + fn call_option() -> CallOption { + CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT)) + } + fn get_region_and_leader_async( &self, key: &[u8], @@ -107,40 +201,12 @@ impl PdRpcClient { self.request(req, executor, LEADER_CHANGE_RETRY) } - fn get_store_async(&self, store_id: u64) -> impl Future { - let timer = Instant::now(); - - let mut req = request!(self.cluster_id, GetStoreRequest); - req.set_store_id(store_id); - - let executor = move |client: &RwLock, req: GetStoreRequest| { - let receiver = client - .rl() - .client - .get_store_async_opt(&req, Self::call_option()) - .unwrap(); - Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_store"]) - .observe(duration_to_sec(timer.elapsed())); - check_resp_header(resp.get_header())?; - Ok(resp.take_store()) - })) as PdFuture<_> - }; - - self.request(req, executor, LEADER_CHANGE_RETRY) - } - - pub fn get_cluster_id(&self) -> Result { - Ok(self.cluster_id) - } - - pub fn on_reconnect(&self, f: Box) { - let mut leader = self.leader.wl(); - leader.on_reconnect = Some(f); - } - - pub fn request(&self, req: Req, func: F, retry: usize) -> PdFuture + fn request( + &self, + req: Req, + func: F, + retry: usize, + ) -> impl Future where Req: Clone + Send + 'static, Resp: Send + fmt::Debug + 'static, @@ -153,11 +219,10 @@ impl PdRpcClient { .and_then(move |resp| { tx.send(resp).unwrap(); Ok(()) - }) - .map_err(|e| panic!("{}", e)) + }).map_err(|e| panic!("{}", e)), ); self.leader.wl().schedule(future); - Box::new(rx.map_err(Error::Canceled).and_then(Ok)) + rx.map_err(Error::Canceled).and_then(Ok) } } @@ -166,55 +231,32 @@ impl PdClient for PdRpcClient { Ok(self.cluster_id) } - fn get_cluster_config(&self) -> PdFuture { - unimplemented!() + fn get_store(&self, store_id: u64) -> Result { + self.get_store_async(store_id).wait() } - fn get_store(&self, store_id: u64) -> PdFuture { - Box::new(self.get_store_async(store_id).and_then(Ok)) + fn get_all_stores(&self) -> Result> { + self.get_all_stores_async().wait() } - fn get_region(&self, key: &[u8]) -> PdFuture { - Box::new(self.get_region_and_leader_async(key).and_then(|x| Ok(x.0))) + fn get_cluster_config(&self) -> Result { + self.get_cluster_config_async().wait() } - fn get_region_info(&self, key: &[u8]) -> PdFuture { - Box::new( - self.get_region_and_leader_async(key) - .and_then(|x| Ok(RegionInfo::new(x.0, x.1))), - ) + fn get_region(&self, key: &[u8]) -> Result { + self.get_region_async(key).wait() } - fn get_region_by_id(&self, region_id: u64) -> PdFuture> { - let timer = Instant::now(); - - let mut req = request!(self.cluster_id, GetRegionByIDRequest); - req.set_region_id(region_id); - - let executor = move |client: &RwLock, req: GetRegionByIDRequest| { - let handler = client - .rl() - .client - .get_region_by_id_async_opt(&req, Self::call_option()) - .unwrap(); - Box::new(handler.map_err(Error::Grpc).and_then(move |mut resp| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_region_by_id"]) - .observe(duration_to_sec(timer.elapsed())); - check_resp_header(resp.get_header())?; - if resp.has_region() { - Ok(Some(resp.take_region())) - } else { - Ok(None) - } - })) as PdFuture<_> - }; + fn get_region_info(&self, key: &[u8]) -> Result { + self.get_region_info_async(key).wait() + } - self.request(req, executor, LEADER_CHANGE_RETRY) + fn get_region_by_id(&self, region_id: u64) -> Result> { + self.get_region_by_id_async(region_id).wait() } - fn get_ts(&self) -> PdFuture { - self.leader.wl().get_ts() + fn get_ts(&self) -> Result { + self.get_ts_async().wait() } } diff --git a/src/pd/leader.rs b/src/pd/leader.rs index 7af52095..b4f7d0db 100644 --- a/src/pd/leader.rs +++ b/src/pd/leader.rs @@ -34,7 +34,6 @@ macro_rules! request { } type TsoChannel = oneshot::Sender; - type PdRequest = Box + Send>; pub enum PdTask { @@ -184,13 +183,15 @@ impl PdReactor { fn dispatch(client: &Arc>, task: PdTask, handle: &OtherHandle) { match task { PdTask::TsoRequest => Self::tso_request(client), - PdTask::TsoResponse(requests, response) => Self::tso_response(client, requests, &response), + PdTask::TsoResponse(requests, response) => { + Self::tso_response(client, requests, &response) + } PdTask::TsoInit => Self::init(client, handle), PdTask::Request(task) => handle.spawn(task), } } - fn get_ts(&mut self) -> PdFuture { + fn get_ts(&mut self) -> impl Future { let timer = Instant::now(); let (tx, rx) = oneshot::channel::(); self.tso_batch.push(tx); @@ -198,12 +199,12 @@ impl PdReactor { /* schedule tso request to run */ self.schedule(PdTask::TsoRequest); } - Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { + rx.map_err(Error::Canceled).and_then(move |ts| { PD_REQUEST_HISTOGRAM_VEC .with_label_values(&["get_ts"]) .observe(duration_to_sec(timer.elapsed())); Ok(ts) - })) + }) } } @@ -242,7 +243,7 @@ impl LeaderClient { client } - pub fn get_ts(&mut self) -> PdFuture { + pub fn get_ts(&mut self) -> impl Future { self.reactor.get_ts() } diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 97fd99e1..c73d3e65 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -57,25 +57,25 @@ pub trait PdClient: Send + Sync { fn get_cluster_id(&self) -> Result; // Get store information. - fn get_store(&self, store_id: u64) -> PdFuture; + fn get_store(&self, store_id: u64) -> Result; // Get all stores information. - fn get_all_stores(&self) -> PdFuture> { + fn get_all_stores(&self) -> Result> { unimplemented!(); } // Get cluster meta information. - fn get_cluster_config(&self) -> PdFuture; + fn get_cluster_config(&self) -> Result; // For route. // Get region which the key belong to. - fn get_region(&self, key: &[u8]) -> PdFuture; + fn get_region(&self, key: &[u8]) -> Result; // Get region info which the key belong to. - fn get_region_info(&self, key: &[u8]) -> PdFuture; + fn get_region_info(&self, key: &[u8]) -> Result; // Get region by region id. - fn get_region_by_id(&self, region_id: u64) -> PdFuture>; + fn get_region_by_id(&self, region_id: u64) -> Result>; // Register a handler to the client, it will be invoked after reconnecting to PD. // @@ -83,5 +83,5 @@ pub trait PdClient: Send + Sync { fn handle_reconnect(&self, _: F) {} // get a timestamp from PD - fn get_ts(&self) -> PdFuture; + fn get_ts(&self) -> Result; } From 37d55aa713fcd89140ed28196bb48bca786d8139 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Thu, 4 Oct 2018 22:29:21 +0800 Subject: [PATCH 4/5] Implement PD get_all_stores method Signed-off-by: Xiaoguang Sun --- examples/pd-client.rs | 1 + src/pd/client.rs | 21 +++++++++++++++++++-- src/pd/leader.rs | 2 +- src/pd/mod.rs | 4 +--- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/examples/pd-client.rs b/examples/pd-client.rs index 1ecac7a1..5d54a02c 100644 --- a/examples/pd-client.rs +++ b/examples/pd-client.rs @@ -27,6 +27,7 @@ fn main() { println!("Cluster ID: {:?}", pd_client.get_cluster_id()); println!("Store: {:?}", pd_client.get_store(1)); + println!("All Stores: {:?}", pd_client.get_all_stores()); println!("Region: {:?}", pd_client.get_region(b"abc")); for _ in 0..10 { println!("TSO: {:?}", pd_client.get_ts()); diff --git a/src/pd/client.rs b/src/pd/client.rs index 0a47f31d..1a098e2d 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -18,7 +18,7 @@ use std::time::{Duration, Instant}; use futures::{future, Future}; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; -use kvproto::pdpb::{GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member}; +use kvproto::pdpb::{GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, GetAllStoresRequest, Member}; use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request}; use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; @@ -75,7 +75,24 @@ impl PdRpcClient { } pub fn get_all_stores_async(&self) -> impl Future, Error = Error> { - future::ok(Vec::new()) + let timer = Instant::now(); + + let executor = move |client: &RwLock, req: GetAllStoresRequest| { + let receiver = client + .rl() + .client + .get_all_stores_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_all_stores"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + Ok(resp.take_stores().into_vec()) + })) as PdFuture<_> + }; + + self.request(request!(self.cluster_id, GetAllStoresRequest), executor, LEADER_CHANGE_RETRY) } pub fn get_store_async( diff --git a/src/pd/leader.rs b/src/pd/leader.rs index b4f7d0db..329a1445 100644 --- a/src/pd/leader.rs +++ b/src/pd/leader.rs @@ -102,7 +102,7 @@ impl PdReactor { .as_ref() .unwrap() .unbounded_send(Some(task)) - .unwrap(); + .expect("unbounded send should never fail"); } fn poll(client: &Arc>, rx: UnboundedReceiver>) { diff --git a/src/pd/mod.rs b/src/pd/mod.rs index c73d3e65..17dc780e 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -60,9 +60,7 @@ pub trait PdClient: Send + Sync { fn get_store(&self, store_id: u64) -> Result; // Get all stores information. - fn get_all_stores(&self) -> Result> { - unimplemented!(); - } + fn get_all_stores(&self) -> Result>; // Get cluster meta information. fn get_cluster_config(&self) -> Result; From a5a1fa784b5199b918d41bb78b6c6ccd963cde71 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Fri, 5 Oct 2018 09:41:52 +0800 Subject: [PATCH 5/5] Change PdClient methods to async Signed-off-by: Xiaoguang Sun --- examples/pd-client.rs | 9 +- src/{pd => }/errors.rs | 0 src/lib.rs | 1 + src/pd/client.rs | 231 ++++++++++++++++------------------------- src/pd/mod.rs | 18 ++-- 5 files changed, 102 insertions(+), 157 deletions(-) rename src/{pd => }/errors.rs (100%) diff --git a/examples/pd-client.rs b/examples/pd-client.rs index 5d54a02c..10e9fbbc 100644 --- a/examples/pd-client.rs +++ b/examples/pd-client.rs @@ -7,6 +7,7 @@ extern crate tikv_client; use std::env; use std::sync::Arc; +use futures::Future; use simplelog::*; use tikv_client::pd::*; @@ -26,10 +27,10 @@ fn main() { .unwrap_or_else(|e| panic!("failed to create rpc client: {:?}", e)); println!("Cluster ID: {:?}", pd_client.get_cluster_id()); - println!("Store: {:?}", pd_client.get_store(1)); - println!("All Stores: {:?}", pd_client.get_all_stores()); - println!("Region: {:?}", pd_client.get_region(b"abc")); + println!("Store: {:?}", pd_client.get_store(1).wait()); + println!("All Stores: {:?}", pd_client.get_all_stores().wait()); + println!("Region: {:?}", pd_client.get_region(b"abc").wait()); for _ in 0..10 { - println!("TSO: {:?}", pd_client.get_ts()); + println!("TSO: {:?}", pd_client.get_ts().wait()); } } diff --git a/src/pd/errors.rs b/src/errors.rs similarity index 100% rename from src/pd/errors.rs rename to src/errors.rs diff --git a/src/lib.rs b/src/lib.rs index 22ac7925..0a9bec75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -40,4 +40,5 @@ extern crate libc; #[macro_use] pub mod util; +pub mod errors; pub mod pd; diff --git a/src/pd/client.rs b/src/pd/client.rs index 1a098e2d..63d12975 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -15,10 +15,12 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use futures::{future, Future}; +use futures::Future; use grpc::{CallOption, EnvBuilder}; use kvproto::metapb; -use kvproto::pdpb::{GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, GetAllStoresRequest, Member}; +use kvproto::pdpb::{ + GetAllStoresRequest, GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member, +}; use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request}; use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT}; @@ -32,14 +34,6 @@ const CQ_COUNT: usize = 1; const CLIENT_PREFIX: &str = "pd"; const LEADER_CHANGE_RETRY: usize = 10; -macro_rules! thd_name { - ($name:expr) => {{ - $crate::util::get_tag_from_thread_name() - .map(|tag| format!("{}::{}", $name, tag)) - .unwrap_or_else(|| $name.to_owned()) - }}; -} - pub struct PdRpcClient { cluster_id: u64, leader: Arc>, @@ -61,117 +55,6 @@ impl PdRpcClient { }) } - pub fn get_cluster_id(&self) -> Result { - Ok(self.cluster_id) - } - - pub fn on_reconnect(&self, f: Box) { - let mut leader = self.leader.wl(); - leader.on_reconnect = Some(f); - } - - pub fn get_cluster_config_async(&self) -> impl Future { - future::ok(metapb::Cluster::new()) - } - - pub fn get_all_stores_async(&self) -> impl Future, Error = Error> { - let timer = Instant::now(); - - let executor = move |client: &RwLock, req: GetAllStoresRequest| { - let receiver = client - .rl() - .client - .get_all_stores_async_opt(&req, Self::call_option()) - .unwrap(); - Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_all_stores"]) - .observe(duration_to_sec(timer.elapsed())); - check_resp_header(resp.get_header())?; - Ok(resp.take_stores().into_vec()) - })) as PdFuture<_> - }; - - self.request(request!(self.cluster_id, GetAllStoresRequest), executor, LEADER_CHANGE_RETRY) - } - - pub fn get_store_async( - &self, - store_id: u64, - ) -> impl Future { - let timer = Instant::now(); - - let mut req = request!(self.cluster_id, GetStoreRequest); - req.set_store_id(store_id); - - let executor = move |client: &RwLock, req: GetStoreRequest| { - let receiver = client - .rl() - .client - .get_store_async_opt(&req, Self::call_option()) - .unwrap(); - Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_store"]) - .observe(duration_to_sec(timer.elapsed())); - check_resp_header(resp.get_header())?; - Ok(resp.take_store()) - })) as PdFuture<_> - }; - - self.request(req, executor, LEADER_CHANGE_RETRY) - } - - pub fn get_region_async( - &self, - key: &[u8], - ) -> impl Future { - self.get_region_and_leader_async(key).and_then(|x| Ok(x.0)) - } - - pub fn get_region_info_async( - &self, - key: &[u8], - ) -> impl Future { - self.get_region_and_leader_async(key) - .and_then(|x| Ok(RegionInfo::new(x.0, x.1))) - } - - pub fn get_region_by_id_async( - &self, - region_id: u64, - ) -> impl Future, Error = Error> { - let timer = Instant::now(); - - let mut req = request!(self.cluster_id, GetRegionByIDRequest); - req.set_region_id(region_id); - - let executor = move |client: &RwLock, req: GetRegionByIDRequest| { - let handler = client - .rl() - .client - .get_region_by_id_async_opt(&req, Self::call_option()) - .unwrap(); - Box::new(handler.map_err(Error::Grpc).and_then(move |mut resp| { - PD_REQUEST_HISTOGRAM_VEC - .with_label_values(&["get_region_by_id"]) - .observe(duration_to_sec(timer.elapsed())); - check_resp_header(resp.get_header())?; - if resp.has_region() { - Ok(Some(resp.take_region())) - } else { - Ok(None) - } - })) as PdFuture<_> - }; - - self.request(req, executor, LEADER_CHANGE_RETRY) - } - - pub fn get_ts_async(&self) -> impl Future { - self.leader.wl().get_ts() - } - fn get_leader(&self) -> Member { self.leader.rl().members.get_leader().clone() } @@ -218,12 +101,7 @@ impl PdRpcClient { self.request(req, executor, LEADER_CHANGE_RETRY) } - fn request( - &self, - req: Req, - func: F, - retry: usize, - ) -> impl Future + fn request(&self, req: Req, func: F, retry: usize) -> PdFuture where Req: Clone + Send + 'static, Resp: Send + fmt::Debug + 'static, @@ -239,7 +117,7 @@ impl PdRpcClient { }).map_err(|e| panic!("{}", e)), ); self.leader.wl().schedule(future); - rx.map_err(Error::Canceled).and_then(Ok) + Box::new(rx.map_err(Error::Canceled).and_then(Ok)) } } @@ -248,32 +126,101 @@ impl PdClient for PdRpcClient { Ok(self.cluster_id) } - fn get_store(&self, store_id: u64) -> Result { - self.get_store_async(store_id).wait() + fn handle_reconnect(&self, f: F) { + let mut leader = self.leader.wl(); + leader.on_reconnect = Some(Box::new(f)); } - fn get_all_stores(&self) -> Result> { - self.get_all_stores_async().wait() + fn get_all_stores(&self) -> PdFuture> { + let timer = Instant::now(); + + let executor = move |client: &RwLock, req: GetAllStoresRequest| { + let receiver = client + .rl() + .client + .get_all_stores_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_all_stores"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + Ok(resp.take_stores().into_vec()) + })) as PdFuture<_> + }; + + self.request( + request!(self.cluster_id, GetAllStoresRequest), + executor, + LEADER_CHANGE_RETRY, + ) } - fn get_cluster_config(&self) -> Result { - self.get_cluster_config_async().wait() + fn get_store(&self, store_id: u64) -> PdFuture { + let timer = Instant::now(); + + let mut req = request!(self.cluster_id, GetStoreRequest); + req.set_store_id(store_id); + + let executor = move |client: &RwLock, req: GetStoreRequest| { + let receiver = client + .rl() + .client + .get_store_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_store"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + Ok(resp.take_store()) + })) as PdFuture<_> + }; + + self.request(req, executor, LEADER_CHANGE_RETRY) } - fn get_region(&self, key: &[u8]) -> Result { - self.get_region_async(key).wait() + fn get_region(&self, key: &[u8]) -> PdFuture { + Box::new(self.get_region_and_leader_async(key).and_then(|x| Ok(x.0))) } - fn get_region_info(&self, key: &[u8]) -> Result { - self.get_region_info_async(key).wait() + fn get_region_info(&self, key: &[u8]) -> PdFuture { + Box::new( + self.get_region_and_leader_async(key) + .and_then(|x| Ok(RegionInfo::new(x.0, x.1))), + ) } - fn get_region_by_id(&self, region_id: u64) -> Result> { - self.get_region_by_id_async(region_id).wait() + fn get_region_by_id(&self, region_id: u64) -> PdFuture> { + let timer = Instant::now(); + + let mut req = request!(self.cluster_id, GetRegionByIDRequest); + req.set_region_id(region_id); + + let executor = move |client: &RwLock, req: GetRegionByIDRequest| { + let handler = client + .rl() + .client + .get_region_by_id_async_opt(&req, Self::call_option()) + .unwrap(); + Box::new(handler.map_err(Error::Grpc).and_then(move |mut resp| { + PD_REQUEST_HISTOGRAM_VEC + .with_label_values(&["get_region_by_id"]) + .observe(duration_to_sec(timer.elapsed())); + check_resp_header(resp.get_header())?; + if resp.has_region() { + Ok(Some(resp.take_region())) + } else { + Ok(None) + } + })) as PdFuture<_> + }; + + self.request(req, executor, LEADER_CHANGE_RETRY) } - fn get_ts(&self) -> Result { - self.get_ts_async().wait() + fn get_ts(&self) -> PdFuture { + Box::new(self.leader.wl().get_ts()) } } diff --git a/src/pd/mod.rs b/src/pd/mod.rs index 17dc780e..6c9203d7 100644 --- a/src/pd/mod.rs +++ b/src/pd/mod.rs @@ -2,9 +2,8 @@ mod leader; mod client; -pub mod errors; pub use self::client::PdRpcClient; -pub use self::errors::{Error, Result}; +pub use errors::{Error, Result}; use std::ops::Deref; @@ -57,23 +56,20 @@ pub trait PdClient: Send + Sync { fn get_cluster_id(&self) -> Result; // Get store information. - fn get_store(&self, store_id: u64) -> Result; + fn get_store(&self, store_id: u64) -> PdFuture; // Get all stores information. - fn get_all_stores(&self) -> Result>; - - // Get cluster meta information. - fn get_cluster_config(&self) -> Result; + fn get_all_stores(&self) -> PdFuture>; // For route. // Get region which the key belong to. - fn get_region(&self, key: &[u8]) -> Result; + fn get_region(&self, key: &[u8]) -> PdFuture; // Get region info which the key belong to. - fn get_region_info(&self, key: &[u8]) -> Result; + fn get_region_info(&self, key: &[u8]) -> PdFuture; // Get region by region id. - fn get_region_by_id(&self, region_id: u64) -> Result>; + fn get_region_by_id(&self, region_id: u64) -> PdFuture>; // Register a handler to the client, it will be invoked after reconnecting to PD. // @@ -81,5 +77,5 @@ pub trait PdClient: Send + Sync { fn handle_reconnect(&self, _: F) {} // get a timestamp from PD - fn get_ts(&self) -> Result; + fn get_ts(&self) -> PdFuture; }