Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions examples/pd-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate tikv_client;
use std::env;
use std::sync::Arc;

use futures::future::Future;
use futures::Future;
use simplelog::*;

use tikv_client::pd::*;
Expand All @@ -26,9 +26,11 @@ fn main() {
let pd_client = PdRpcClient::new(&addr, Arc::clone(&security_manager))
.unwrap_or_else(|e| panic!("failed to create rpc client: {:?}", e));

println!("Cluster ID: {}", pd_client.get_cluster_id().unwrap());
let store = pd_client.get_store(1).wait();
println!("Store: {:?}", store);
let region = pd_client.get_region(b"abc").wait();
println!("Region: {:?}", region);
println!("Cluster ID: {:?}", pd_client.get_cluster_id());
println!("Store: {:?}", pd_client.get_store(1).wait());
println!("All Stores: {:?}", pd_client.get_all_stores().wait());
println!("Region: {:?}", pd_client.get_region(b"abc").wait());
for _ in 0..10 {
println!("TSO: {:?}", pd_client.get_ts().wait());
}
}
File renamed without changes.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extern crate lazy_static;
#[cfg(target_os = "linux")]
extern crate libc;

pub mod pd;
#[macro_use]
pub mod util;
pub mod errors;
pub mod pd;
171 changes: 78 additions & 93 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ use std::fmt;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use futures::sync::oneshot;
use futures::Future;
use grpc::{CallOption, EnvBuilder};
use kvproto::metapb;
use kvproto::pdpb::{self, Member};
use kvproto::pdpb::{
GetAllStoresRequest, GetRegionByIDRequest, GetRegionRequest, GetStoreRequest, Member,
};

use super::util::{check_resp_header, validate_endpoints, LeaderClient, Request};
use super::leader::{check_resp_header, validate_endpoints, LeaderClient, Request};
use super::{Error, PdClient, RegionInfo, Result, PD_REQUEST_HISTOGRAM_VEC, REQUEST_TIMEOUT};
use futures::sync::oneshot;
use pd::{PdFuture, PdTimestamp};
use util::security::SecurityManager;
use util::time::duration_to_sec;
Expand All @@ -32,29 +34,11 @@ const CQ_COUNT: usize = 1;
const CLIENT_PREFIX: &str = "pd";
const LEADER_CHANGE_RETRY: usize = 10;

macro_rules! thd_name {
($name:expr) => {{
$crate::util::get_tag_from_thread_name()
.map(|tag| format!("{}::{}", $name, tag))
.unwrap_or_else(|| $name.to_owned())
}};
}

pub struct PdRpcClient {
cluster_id: u64,
leader: Arc<RwLock<LeaderClient>>,
}

macro_rules! request {
($cluster_id:expr, $type:ty) => {{
let mut request = <$type>::new();
let mut header = pdpb::RequestHeader::new();
header.set_cluster_id($cluster_id);
request.set_header(header);
request
}};
}

impl PdRpcClient {
pub fn new(endpoints: &[&str], security_mgr: Arc<SecurityManager>) -> Result<PdRpcClient> {
let env = Arc::new(
Expand All @@ -67,34 +51,29 @@ impl PdRpcClient {

Ok(PdRpcClient {
cluster_id: members.get_header().get_cluster_id(),
leader: Arc::new(RwLock::new(LeaderClient::new(
env,
security_mgr,
client,
members,
))),
leader: LeaderClient::new(env, security_mgr, client, members),
})
}

fn get_leader(&self) -> Member {
self.leader.rl().members.get_leader().clone()
}

#[inline]
fn call_option() -> CallOption {
CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT))
}

pub fn get_leader(&self) -> Member {
self.leader.rl().members.get_leader().clone()
}

fn get_region_and_leader_async(
&self,
key: &[u8],
) -> impl Future<Item = (metapb::Region, Option<metapb::Peer>), Error = Error> {
let timer = Instant::now();

let mut req = request!(self.cluster_id, pdpb::GetRegionRequest);
let mut req = request!(self.cluster_id, GetRegionRequest);
req.set_region_key(key.to_owned());

let executor = move |client: &RwLock<LeaderClient>, req: pdpb::GetRegionRequest| {
let executor = move |client: &RwLock<LeaderClient>, req: GetRegionRequest| {
let receiver = client
.rl()
.client
Expand All @@ -119,84 +98,86 @@ impl PdRpcClient {
Ok((region, leader))
})) as PdFuture<_>
};
self.request(req, executor, LEADER_CHANGE_RETRY).execute()
self.request(req, executor, LEADER_CHANGE_RETRY)
}

fn get_store_async(&self, store_id: u64) -> impl Future<Item = metapb::Store, Error = Error> {
let timer = Instant::now();
fn request<Req, Resp, F>(&self, req: Req, func: F, retry: usize) -> PdFuture<Resp>
where
Req: Clone + Send + 'static,
Resp: Send + fmt::Debug + 'static,
F: FnMut(&RwLock<LeaderClient>, Req) -> PdFuture<Resp> + Send + 'static,
{
let future = Request::new(req, func, Arc::clone(&self.leader), retry).execute();
let (tx, rx) = oneshot::channel();
let future = Box::new(
future
.and_then(move |resp| {
tx.send(resp).unwrap();
Ok(())
}).map_err(|e| panic!("{}", e)),
);
self.leader.wl().schedule(future);
Box::new(rx.map_err(Error::Canceled).and_then(Ok))
}
}

let mut req = request!(self.cluster_id, pdpb::GetStoreRequest);
req.set_store_id(store_id);
impl PdClient for PdRpcClient {
fn get_cluster_id(&self) -> Result<u64> {
Ok(self.cluster_id)
}

fn handle_reconnect<F: Fn() + Sync + Send + 'static>(&self, f: F) {
let mut leader = self.leader.wl();
leader.on_reconnect = Some(Box::new(f));
}

let executor = move |client: &RwLock<LeaderClient>, req: pdpb::GetStoreRequest| {
fn get_all_stores(&self) -> PdFuture<Vec<metapb::Store>> {
let timer = Instant::now();

let executor = move |client: &RwLock<LeaderClient>, req: GetAllStoresRequest| {
let receiver = client
.rl()
.client
.get_store_async_opt(&req, Self::call_option())
.get_all_stores_async_opt(&req, Self::call_option())
.unwrap();
Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| {
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_store"])
.with_label_values(&["get_all_stores"])
.observe(duration_to_sec(timer.elapsed()));
check_resp_header(resp.get_header())?;
Ok(resp.take_store())
Ok(resp.take_stores().into_vec())
})) as PdFuture<_>
};

self.request(req, executor, LEADER_CHANGE_RETRY).execute()
}

pub fn get_cluster_id(&self) -> Result<u64> {
Ok(self.cluster_id)
}

pub fn get_ts(&self) -> Result<PdTimestamp> {
self.get_ts_async().wait()
self.request(
request!(self.cluster_id, GetAllStoresRequest),
executor,
LEADER_CHANGE_RETRY,
)
}

pub fn get_ts_async(&self) -> PdFuture<PdTimestamp> {
fn get_store(&self, store_id: u64) -> PdFuture<metapb::Store> {
let timer = Instant::now();

let mut req = request!(self.cluster_id, pdpb::TsoRequest);
req.set_count(1);

let (tx, rx) = oneshot::channel::<PdTimestamp>();
let leader = self.leader.wl();
leader.tso_requests_sender.unbounded_send(tx).unwrap();
Box::new(rx.map_err(Error::Canceled).and_then(move |ts| {
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_ts"])
.observe(duration_to_sec(timer.elapsed()));
Ok(ts)
}))
}

pub fn on_reconnect(&self, f: Box<Fn() + Sync + Send + 'static>) {
let mut leader = self.leader.wl();
leader.on_reconnect = Some(f);
}

pub fn request<Req, Resp, F>(&self, req: Req, func: F, retry: usize) -> Request<Req, Resp, F>
where
Req: Clone + Send + 'static,
Resp: Send + 'static,
F: FnMut(&RwLock<LeaderClient>, Req) -> PdFuture<Resp> + Send + 'static,
{
Request::new(req, func, Arc::clone(&self.leader), retry)
}
}

impl PdClient for PdRpcClient {
fn get_cluster_id(&self) -> Result<u64> {
Ok(self.cluster_id)
}
let mut req = request!(self.cluster_id, GetStoreRequest);
req.set_store_id(store_id);

fn get_cluster_config(&self) -> PdFuture<metapb::Cluster> {
unimplemented!()
}
let executor = move |client: &RwLock<LeaderClient>, req: GetStoreRequest| {
let receiver = client
.rl()
.client
.get_store_async_opt(&req, Self::call_option())
.unwrap();
Box::new(receiver.map_err(Error::Grpc).and_then(move |mut resp| {
PD_REQUEST_HISTOGRAM_VEC
.with_label_values(&["get_store"])
.observe(duration_to_sec(timer.elapsed()));
check_resp_header(resp.get_header())?;
Ok(resp.take_store())
})) as PdFuture<_>
};

fn get_store(&self, store_id: u64) -> PdFuture<metapb::Store> {
Box::new(self.get_store_async(store_id).and_then(Ok))
self.request(req, executor, LEADER_CHANGE_RETRY)
}

fn get_region(&self, key: &[u8]) -> PdFuture<metapb::Region> {
Expand All @@ -213,10 +194,10 @@ impl PdClient for PdRpcClient {
fn get_region_by_id(&self, region_id: u64) -> PdFuture<Option<metapb::Region>> {
let timer = Instant::now();

let mut req = request!(self.cluster_id, pdpb::GetRegionByIDRequest);
let mut req = request!(self.cluster_id, GetRegionByIDRequest);
req.set_region_id(region_id);

let executor = move |client: &RwLock<LeaderClient>, req: pdpb::GetRegionByIDRequest| {
let executor = move |client: &RwLock<LeaderClient>, req: GetRegionByIDRequest| {
let handler = client
.rl()
.client
Expand All @@ -235,7 +216,11 @@ impl PdClient for PdRpcClient {
})) as PdFuture<_>
};

self.request(req, executor, LEADER_CHANGE_RETRY).execute()
self.request(req, executor, LEADER_CHANGE_RETRY)
}

fn get_ts(&self) -> PdFuture<PdTimestamp> {
Box::new(self.leader.wl().get_ts())
}
}

Expand Down
Loading