Skip to content

Commit

Permalink
Merge pull request #57 from nrc/minor
Browse files Browse the repository at this point in the history
Remove some code
  • Loading branch information
nrc committed Jun 4, 2019
2 parents 80dbb52 + 8fbbfbf commit e6f2251
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 117 deletions.
8 changes: 4 additions & 4 deletions src/rpc/client.rs
Expand Up @@ -23,7 +23,6 @@ use crate::{
pd::{PdClient, PdTimestamp, Region, RegionId, RegionVerId, Store, StoreId},
security::SecurityManager,
tikv::KvClient,
util::HandyRwLock,
},
Config, Error, Key, KvPair, Result, Value,
};
Expand All @@ -44,7 +43,7 @@ impl RpcClientInner {
let env = Arc::new(
EnvBuilder::new()
.cq_count(CQ_COUNT)
.name_prefix(thd_name!(CLIENT_PREFIX))
.name_prefix(thread_name!(CLIENT_PREFIX))
.build(),
);
let security_mgr = Arc::new(
Expand Down Expand Up @@ -111,7 +110,7 @@ impl RpcClientInner {
}

fn kv_client(&self, context: RegionContext) -> Result<(RegionContext, Arc<KvClient>)> {
if let Some(conn) = self.tikv.rl().get(context.address()) {
if let Some(conn) = self.tikv.read().unwrap().get(context.address()) {
return Ok((context, Arc::clone(conn)));
};
info!("connect to tikv endpoint: {:?}", context.address());
Expand All @@ -124,7 +123,8 @@ impl RpcClientInner {
)
.map(Arc::new)
.map(|c| {
tikv.wl()
tikv.write()
.unwrap()
.insert(context.address().to_owned(), Arc::clone(&c));
(context, c)
})
Expand Down
18 changes: 8 additions & 10 deletions src/rpc/pd/client.rs
Expand Up @@ -14,14 +14,12 @@ use kvproto::{metapb, pdpb, pdpb::PdClient as RpcClient};

use crate::{
rpc::{
context::RequestContext,
pd::{
context::{request_context, PdRequestContext},
leader::LeaderClient,
request::Request,
PdTimestamp, Region, RegionId, Store, StoreId,
context::request_context, leader::LeaderClient, request::Request, PdTimestamp, Region,
RegionId, Store, StoreId,
},
security::SecurityManager,
util::HandyRwLock,
},
Error, ErrorKind, Result,
};
Expand Down Expand Up @@ -64,7 +62,7 @@ impl PdClient {
timeout: Duration,
) -> Result<PdClient> {
let leader = LeaderClient::connect(env, endpoints, security_mgr, timeout)?;
let cluster_id = leader.rl().cluster_id();
let cluster_id = leader.read().unwrap().cluster_id();

Ok(PdClient {
cluster_id,
Expand All @@ -74,7 +72,7 @@ impl PdClient {
}

fn get_leader(&self) -> pdpb::Member {
self.leader.rl().members.get_leader().clone()
self.leader.read().unwrap().members.get_leader().clone()
}

fn get_region_and_leader(
Expand Down Expand Up @@ -137,7 +135,7 @@ impl PdClient {

fn execute<Executor, Resp, RpcFuture>(
&self,
mut context: PdRequestContext<Executor>,
mut context: RequestContext<Executor>,
) -> impl Future<Output = Result<Resp>>
where
Executor: FnMut(&RpcClient, CallOption) -> ::grpcio::Result<RpcFuture> + Send + 'static,
Expand All @@ -148,7 +146,7 @@ impl PdClient {
let mut executor = context.executor();
let wrapper = move |cli: &RwLock<LeaderClient>| {
let option = CallOption::default().timeout(timeout);
let cli = &cli.rl().client;
let cli = &cli.read().unwrap().client;
executor(cli, option).unwrap().map(|r| match r {
Err(e) => Err(ErrorKind::Grpc(e))?,
Ok(r) => {
Expand Down Expand Up @@ -209,7 +207,7 @@ impl PdClient {
}

pub fn get_ts(&self) -> impl Future<Output = Result<PdTimestamp>> {
self.leader.wl().get_ts()
self.leader.write().unwrap().get_ts()
}
}

Expand Down
40 changes: 9 additions & 31 deletions src/rpc/pd/context.rs
@@ -1,44 +1,22 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::ops::{Deref, DerefMut};

use lazy_static::*;
use prometheus::*;

use crate::rpc::context::RequestContext;

pub struct PdRequestContext<Executor> {
target: RequestContext<Executor>,
}

impl<Executor> Deref for PdRequestContext<Executor> {
type Target = RequestContext<Executor>;

fn deref(&self) -> &Self::Target {
&self.target
}
}

impl<Executor> DerefMut for PdRequestContext<Executor> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.target
}
}

pub fn request_context<Executor>(
cmd: &'static str,
executor: Executor,
) -> PdRequestContext<Executor> {
PdRequestContext {
target: RequestContext::new(
cmd,
&PD_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_REQUEST_COUNTER_VEC,
&PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_FAILED_REQUEST_COUNTER_VEC,
executor,
),
}
) -> RequestContext<Executor> {
RequestContext::new(
cmd,
&PD_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_REQUEST_COUNTER_VEC,
&PD_FAILED_REQUEST_DURATION_HISTOGRAM_VEC,
&PD_FAILED_REQUEST_COUNTER_VEC,
executor,
)
}

pub fn observe_tso_batch(batch_size: usize) -> u32 {
Expand Down
17 changes: 8 additions & 9 deletions src/rpc/pd/leader.rs
Expand Up @@ -31,7 +31,6 @@ use crate::{
PdTimestamp,
},
security::SecurityManager,
util::HandyRwLock,
},
Error, Result,
};
Expand Down Expand Up @@ -130,10 +129,10 @@ impl PdReactor {

fn init(client: &Arc<RwLock<LeaderClient>>, handle: &OtherHandle) {
let client = Arc::clone(client);
let (tx, rx) = client.wl().client.tso().unwrap();
let (tx, rx) = client.write().unwrap().client.tso().unwrap();
let tx = Compat01As03Sink::new(tx);
let rx = Compat01As03::new(rx);
let tso_rx = client.wl().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream
let tso_rx = client.write().unwrap().reactor.tso_rx.take().unwrap(); // Receiver<TsoRequest>: Stream

handle.spawn(
tx.sink_map_err(Into::into)
Expand All @@ -157,7 +156,7 @@ impl PdReactor {

handle.spawn(
rx.try_for_each(move |resp| {
let mut client = client.wl();
let mut client = client.write().unwrap();
let reactor = &mut client.reactor;
let tso_pending = reactor.tso_pending.take().unwrap();
reactor.schedule(PdTask::Response(tso_pending, resp));
Expand All @@ -173,7 +172,7 @@ impl PdReactor {
}

fn tso_request(client: &Arc<RwLock<LeaderClient>>) {
let mut client = client.wl();
let mut client = client.write().unwrap();
let cluster_id = client.cluster_id;
let reactor = &mut client.reactor;
let mut tso_batch = reactor.tso_buffer.take().unwrap();
Expand Down Expand Up @@ -202,7 +201,7 @@ impl PdReactor {
})
.unwrap();
}
client.wl().reactor.tso_buffer = Some(requests);
client.write().unwrap().reactor.tso_buffer = Some(requests);
}

fn dispatch(client: &Arc<RwLock<LeaderClient>>, task: PdTask, handle: &OtherHandle) {
Expand Down Expand Up @@ -263,7 +262,7 @@ impl LeaderClient {
timeout,
}));

client.wl().reactor.start(Arc::clone(&client));
client.write().unwrap().reactor.start(Arc::clone(&client));
Ok(client)
}

Expand All @@ -275,7 +274,7 @@ impl LeaderClient {
pub fn reconnect(leader: &Arc<RwLock<LeaderClient>>, interval: u64) -> Result<()> {
warn!("updating pd client, blocking the tokio core");
let ((client, members), start) = {
let leader = leader.rl();
let leader = leader.read().unwrap();
if leader.last_update.elapsed() < Duration::from_secs(interval) {
// Avoid unnecessary updating.
return Ok(());
Expand All @@ -291,7 +290,7 @@ impl LeaderClient {

{
let leader_clone = Arc::clone(leader);
let mut leader = leader.wl();
let mut leader = leader.write().unwrap();
leader.client = client;
leader.members = members;
leader.last_update = Instant::now();
Expand Down
66 changes: 3 additions & 63 deletions src/rpc/util.rs
@@ -1,10 +1,6 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::{
sync::{mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
thread,
time::Duration,
};
use std::{sync::mpsc, thread, time::Duration};

use lazy_static::*;
use tokio_timer::{self, timer::Handle};
Expand All @@ -22,31 +18,14 @@ macro_rules! internal_err {
}

/// make a thread name with additional tag inheriting from current thread.
macro_rules! thd_name {
macro_rules! thread_name {
($name:expr) => {{
$crate::rpc::util::get_tag_from_thread_name()
.map(|tag| format!("{}::{}", $name, tag))
.unwrap_or_else(|| $name.to_owned())
}};
}

/// A handy shortcut to replace `RwLock` write/read().unwrap() pattern to
/// shortcut wl and rl.
pub trait HandyRwLock<T> {
fn wl(&self) -> RwLockWriteGuard<T>;
fn rl(&self) -> RwLockReadGuard<T>;
}

impl<T> HandyRwLock<T> for RwLock<T> {
fn wl(&self) -> RwLockWriteGuard<T> {
self.write().unwrap()
}

fn rl(&self) -> RwLockReadGuard<T> {
self.read().unwrap()
}
}

pub fn get_tag_from_thread_name() -> Option<String> {
thread::current()
.name()
Expand All @@ -69,7 +48,7 @@ lazy_static! {
fn start_global_timer() -> Handle {
let (tx, rx) = mpsc::channel();
thread::Builder::new()
.name(thd_name!("timer"))
.name(thread_name!("timer"))
.spawn(move || {
let mut timer = tokio_timer::Timer::default();
tx.send(timer.handle()).unwrap();
Expand All @@ -83,46 +62,7 @@ fn start_global_timer() -> Handle {

#[cfg(test)]
mod tests {
use super::*;
use futures::compat::Compat01As03;
use std::*;

#[test]
fn test_rwlock_deadlock() {
// If the test runs over 60s, then there is a deadlock.
let mu = RwLock::new(Some(1));
{
let _clone = foo(&mu.rl());
let mut data = mu.wl();
assert!(data.is_some());
*data = None;
}

{
match foo(&mu.rl()) {
Some(_) | None => {
let res = mu.try_write();
assert!(res.is_err());
}
}
}

#[cfg_attr(feature = "cargo-clippy", allow(clippy::clone_on_copy))]
fn foo(a: &Option<usize>) -> Option<usize> {
a.clone()
}
}

#[test]
fn test_internal_error() {
let file_name = file!();
let line_number = line!();
let e = internal_err!("{}", "hi");
assert_eq!(
format!("{}", e),
format!("[{}:{}]: hi", file_name, line_number + 1)
);
}

#[test]
fn test_global_timer() {
Expand Down

0 comments on commit e6f2251

Please sign in to comment.