Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: update futures #2326

Merged
merged 3 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 37 additions & 46 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Duration;

use protobuf::RepeatedField;
use futures::{future, Future, Sink, Stream};
use futures::sync::mpsc::{self, UnboundedSender};
use futures::sync::mpsc;
use grpc::{CallOption, EnvBuilder, WriteFlags};
use kvproto::metapb;
use kvproto::pdpb::{self, Member};
Expand Down Expand Up @@ -209,17 +209,14 @@ impl PdClient for RpcClient {
let executor = |client: &RwLock<Inner>, req: pdpb::GetRegionByIDRequest| {
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let handler = client.rl().client.get_region_by_id_async_opt(req, option);
handler
.map_err(Error::Grpc)
.and_then(|mut resp| {
try!(check_resp_header(resp.get_header()));
if resp.has_region() {
Ok(Some(resp.take_region()))
} else {
Ok(None)
}
})
.boxed()
Box::new(handler.map_err(Error::Grpc).and_then(|mut resp| {
try!(check_resp_header(resp.get_header()));
if resp.has_region() {
Ok(Some(resp.take_region()))
} else {
Ok(None)
}
})) as PdFuture<_>
};

self.leader_client
Expand Down Expand Up @@ -248,26 +245,29 @@ impl PdClient for RpcClient {
let sender = match inner.hb_sender {
Either::Left(ref mut sender) => sender.take(),
Either::Right(ref sender) => {
return future::result(
UnboundedSender::send(sender, req).map_err(|e| Error::Other(Box::new(e))),
).boxed()
return Box::new(future::result(
sender
.unbounded_send(req)
.map_err(|e| Error::Other(Box::new(e))),
)) as PdFuture<_>
}
};

match sender {
Some(sender) => {
let (tx, rx) = mpsc::unbounded();
UnboundedSender::send(&tx, req).unwrap();
tx.unbounded_send(req).unwrap();
inner.hb_sender = Either::Right(tx);
sender
.sink_map_err(Error::Grpc)
.send_all(
rx.map_err(|e| {
Error::Other(box_err!("failed to recv heartbeat: {:?}", e))
}).map(|r| (r, WriteFlags::default())),
)
.map(|_| ())
.boxed()
Box::new(
sender
.sink_map_err(Error::Grpc)
.send_all(
rx.map_err(|e| {
Error::Other(box_err!("failed to recv heartbeat: {:?}", e))
}).map(|r| (r, WriteFlags::default())),
)
.map(|_| ()),
) as PdFuture<_>
}
None => unreachable!(),
}
Expand All @@ -293,13 +293,10 @@ impl PdClient for RpcClient {
let executor = |client: &RwLock<Inner>, req: pdpb::AskSplitRequest| {
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let handler = client.rl().client.ask_split_async_opt(req, option);
handler
.map_err(Error::Grpc)
.and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(resp)
})
.boxed()
Box::new(handler.map_err(Error::Grpc).and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(resp)
})) as PdFuture<_>
};

self.leader_client
Expand All @@ -315,13 +312,10 @@ impl PdClient for RpcClient {
let executor = |client: &RwLock<Inner>, req: pdpb::StoreHeartbeatRequest| {
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let handler = client.rl().client.store_heartbeat_async_opt(req, option);
handler
.map_err(Error::Grpc)
.and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(())
})
.boxed()
Box::new(handler.map_err(Error::Grpc).and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(())
})) as PdFuture<_>
};

self.leader_client
Expand All @@ -338,13 +332,10 @@ impl PdClient for RpcClient {
let executor = |client: &RwLock<Inner>, req: pdpb::ReportSplitRequest| {
let option = CallOption::default().timeout(Duration::from_secs(REQUEST_TIMEOUT));
let handler = client.rl().client.report_split_async_opt(req, option);
handler
.map_err(Error::Grpc)
.and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(())
})
.boxed()
Box::new(handler.map_err(Error::Grpc).and_then(|resp| {
try!(check_resp_header(resp.get_header()));
Ok(())
})) as PdFuture<_>
};

self.leader_client
Expand Down
4 changes: 2 additions & 2 deletions src/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::BoxFuture;

mod metrics;
mod client;
Expand All @@ -24,9 +23,10 @@ pub use self::util::validate_endpoints;

use kvproto::metapb;
use kvproto::pdpb;
use futures::Future;

pub type Key = Vec<u8>;
pub type PdFuture<T> = BoxFuture<T, Error>;
pub type PdFuture<T> = Box<Future<Item = T, Error = Error> + Send>;

#[derive(Default)]
pub struct RegionStat {
Expand Down
79 changes: 40 additions & 39 deletions src/pd/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Instant;
use std::time::Duration;
use std::collections::HashSet;

use futures::{task, Async, BoxFuture, Future, Poll, Stream};
use futures::{task, Async, Future, Poll, Stream};
use futures::task::Task;
use futures::future::{loop_fn, ok, Loop};
use futures::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -119,11 +119,12 @@ impl LeaderClient {
receiver: None,
inner: self.inner.clone(),
};
recv.for_each(move |resp| {
f(resp);
Ok(())
}).map_err(|e| panic!("unexpected error: {:?}", e))
.boxed()
Box::new(
recv.for_each(move |resp| {
f(resp);
Ok(())
}).map_err(|e| panic!("unexpected error: {:?}", e)),
)
}

pub fn request<Req, Resp, F>(&self, req: Req, f: F, retry: usize) -> Request<Req, Resp, F>
Expand Down Expand Up @@ -206,11 +207,11 @@ where
Resp: Send + 'static,
F: FnMut(&RwLock<Inner>, Req) -> PdFuture<Resp> + Send + 'static,
{
fn reconnect_if_needed(mut self) -> BoxFuture<Self, Self> {
fn reconnect_if_needed(mut self) -> Box<Future<Item = Self, Error = Self> + Send> {
debug!("reconnect remains: {}", self.reconnect_count);

if self.request_sent < MAX_REQUEST_COUNT {
return ok(self).boxed();
return Box::new(ok(self));
}

// Updating client.
Expand All @@ -221,41 +222,40 @@ where
match self.client.reconnect() {
Ok(_) => {
self.request_sent = 0;
ok(self).boxed()
Box::new(ok(self))
}
Err(_) => self.client
.timer
.sleep(Duration::from_secs(RECONNECT_INTERVAL_SEC))
.then(|_| Err(self))
.boxed(),
Err(_) => Box::new(
self.client
.timer
.sleep(Duration::from_secs(RECONNECT_INTERVAL_SEC))
.then(|_| Err(self)),
),
}
}

fn send_and_receive(mut self) -> BoxFuture<Self, Self> {
fn send_and_receive(mut self) -> Box<Future<Item = Self, Error = Self> + Send> {
self.request_sent += 1;
debug!("request sent: {}", self.request_sent);
let r = self.req.clone();

ok(self)
.and_then(|mut ctx| {
ctx.timer = Some(PD_SEND_MSG_HISTOGRAM.start_coarse_timer());
let req = (ctx.func)(&ctx.client.inner, r);
req.then(|resp| {
// Observe on dropping, schedule time will be recorded too.
ctx.timer.take();
match resp {
Ok(resp) => {
ctx.resp = Some(Ok(resp));
Ok(ctx)
}
Err(err) => {
error!("request failed: {:?}", err);
Err(ctx)
}
Box::new(ok(self).and_then(|mut ctx| {
ctx.timer = Some(PD_SEND_MSG_HISTOGRAM.start_coarse_timer());
let req = (ctx.func)(&ctx.client.inner, r);
req.then(|resp| {
// Observe on dropping, schedule time will be recorded too.
ctx.timer.take();
match resp {
Ok(resp) => {
ctx.resp = Some(Ok(resp));
Ok(ctx)
}
})
Err(err) => {
error!("request failed: {:?}", err);
Err(ctx)
}
}
})
.boxed()
}))
}

fn break_or_continue(ctx: result::Result<Self, Self>) -> Result<Loop<Self, Self>> {
Expand All @@ -279,12 +279,13 @@ where
/// is resolved successfully, otherwise it repeats `retry` times.
pub fn execute(self) -> PdFuture<Resp> {
let ctx = self;
loop_fn(ctx, |ctx| {
ctx.reconnect_if_needed()
.and_then(Self::send_and_receive)
.then(Self::break_or_continue)
}).then(Self::post_loop)
.boxed()
Box::new(
loop_fn(ctx, |ctx| {
ctx.reconnect_if_needed()
.and_then(Self::send_and_receive)
.then(Self::break_or_continue)
}).then(Self::post_loop),
)
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/raft_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl Conn {
sink.sink_map_err(Error::from)
.send_all(
rx.map(|msgs: Vec<(RaftMessage, WriteFlags)>| {
stream::iter::<_, _, ()>(msgs.into_iter().map(Ok))
stream::iter_ok(msgs)
}).flatten()
.map_err(|_| Error::Sink),
.map_err(|()| Error::Sink),
)
.then(move |r| {
alive.store(false, Ordering::SeqCst);
Expand Down Expand Up @@ -150,7 +150,7 @@ impl RaftClient {

let mut msgs = conn.buffer.take().unwrap();
msgs.last_mut().unwrap().1 = WriteFlags::default();
if let Err(e) = UnboundedSender::send(&conn.stream, msgs) {
if let Err(e) = conn.stream.unbounded_send(msgs) {
error!(
"server: drop conn with tikv endpoint {} flush conn error: {:?}",
addr,
Expand Down
1 change: 1 addition & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
snap_worker.scheduler(),
);
let addr = try!(SocketAddr::from_str(&cfg.addr));
info!("listening on {}", addr);
let ip = format!("{}", addr.ip());
let channel_args = ChannelBuilder::new(env.clone())
.stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as usize)
Expand Down
Loading