Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: switch to futures 0.3 #447

Merged
merged 6 commits into from Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Expand Up @@ -19,7 +19,7 @@ all-features = true
[dependencies]
grpcio-sys = { path = "grpc-sys", version = "0.5.0" }
libc = "0.2"
futures = "^0.1.15"
futures = "0.3"
protobuf = { version = "2.0", optional = true }
prost = { version = "0.6", optional = true }
bytes = { version = "0.5", optional = true }
Expand All @@ -42,3 +42,6 @@ debug = true

[badges]
travis-ci = { repository = "pingcap/grpc-rs" }

[patch.crates-io]
grpcio-compiler = { path = "compiler", version = "0.5.0", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing \n at end of file

4 changes: 2 additions & 2 deletions benchmark/Cargo.toml
Expand Up @@ -12,13 +12,13 @@ prost-codec = ["grpcio/prost-codec", "grpcio-proto/prost-codec"]
[dependencies]
grpcio = { path = ".." }
grpcio-proto = { path = "../proto", default-features = false }
futures = "0.1"
futures = "0.3"
libc = "0.2"
grpcio-sys = { path = "../grpc-sys" }
rand = "0.7"
rand_distr = "0.2"
rand_xorshift = "0.2"
tokio-timer = "0.1"
futures-timer = "3.0"
clap = "2.23"
log = "0.4"
slog = "2.0"
Expand Down
22 changes: 17 additions & 5 deletions benchmark/src/bench.rs
Expand Up @@ -6,7 +6,7 @@ use std::io::Read;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use futures::{Future, Sink, Stream};
use futures::prelude::*;
use grpc::{
self, ClientStreamingSink, DuplexSink, MessageReader, Method, MethodType, RequestStream,
RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, ServiceBuilder, UnarySink,
Expand Down Expand Up @@ -39,9 +39,16 @@ impl BenchmarkService for Benchmark {
&mut self,
ctx: RpcContext,
stream: RequestStream<SimpleRequest>,
sink: DuplexSink<SimpleResponse>,
mut sink: DuplexSink<SimpleResponse>,
) {
let f = sink.send_all(stream.map(|req| (gen_resp(&req), WriteFlags::default())));
let f = async move {
sink.send_all(
&mut stream.map(|req| req.map(|req| (gen_resp(&req), WriteFlags::default()))),
)
.await?;
sink.close().await?;
Ok(())
};
let keep_running = self.keep_running.clone();
spawn!(ctx, keep_running, "streaming", f)
}
Expand Down Expand Up @@ -90,9 +97,14 @@ impl Generic {
&self,
ctx: &RpcContext,
stream: RequestStream<Vec<u8>>,
sink: DuplexSink<Vec<u8>>,
mut sink: DuplexSink<Vec<u8>>,
) {
let f = sink.send_all(stream.map(|req| (req, WriteFlags::default())));
let f = async move {
sink.send_all(&mut stream.map(|req| req.map(|req| (req, WriteFlags::default()))))
.await?;
sink.close().await?;
Ok(())
};
let keep_running = self.keep_running.clone();
spawn!(ctx, keep_running, "streaming", f)
}
Expand Down
202 changes: 73 additions & 129 deletions benchmark/src/client.rs
Expand Up @@ -6,28 +6,23 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};

use futures::future::Loop;
use futures::sync::oneshot::{self, Receiver, Sender};
use futures::{future, Async, Future, Sink, Stream};
use grpc::{
use futures::channel::oneshot::{self, Receiver, Sender};
use futures::prelude::*;
use grpcio::{
CallOption, Channel, ChannelBuilder, Client as GrpcClient, EnvBuilder, Environment, WriteFlags,
};
use grpc_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpc_proto::testing::messages::SimpleRequest;
use grpc_proto::testing::services_grpc::BenchmarkServiceClient;
use grpc_proto::testing::stats::ClientStats;
use grpc_proto::util as proto_util;
use grpcio_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpcio_proto::testing::messages::SimpleRequest;
use grpcio_proto::testing::services_grpc::BenchmarkServiceClient;
use grpcio_proto::testing::stats::ClientStats;
use grpcio_proto::util as proto_util;
use rand::{self, SeedableRng};
use rand_distr::{Distribution, Exp};
use rand_xorshift::XorShiftRng;
use tokio_timer::{Sleep, Timer};

use crate::bench;
use crate::error::Error;
use crate::util::{self, CpuRecorder, Histogram};

type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>;

fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = cfg.get_payload_config();
Expand Down Expand Up @@ -87,7 +82,6 @@ struct ExecutorContext<B> {
keep_running: Arc<AtomicBool>,
histogram: Arc<Mutex<Histogram>>,
backoff: B,
timer: Timer,
_trace: Sender<()>,
}

Expand All @@ -97,15 +91,13 @@ impl<B: Backoff> ExecutorContext<B> {
histogram: Arc<Mutex<Histogram>>,
keep_running: Arc<AtomicBool>,
backoff: B,
timer: Timer,
) -> (ExecutorContext<B>, Receiver<()>) {
let (tx, rx) = oneshot::channel();
(
ExecutorContext {
keep_running,
histogram,
backoff,
timer,
_trace: tx,
},
rx,
Expand All @@ -118,8 +110,8 @@ impl<B: Backoff> ExecutorContext<B> {
his.observe(f);
}

fn backoff_async(&mut self) -> Option<Sleep> {
self.backoff.backoff_time().map(|dur| self.timer.sleep(dur))
fn backoff_async(&mut self) -> Option<futures_timer::Delay> {
self.backoff.backoff_time().map(futures_timer::Delay::new)
}

fn backoff(&mut self) {
Expand Down Expand Up @@ -151,50 +143,36 @@ impl<B: Backoff + Send + 'static> GenericExecutor<B> {
}
}

fn execute_stream(self) {
fn execute_stream(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let (sender, receiver) = self
let (mut sender, mut receiver) = self
.client
.duplex_streaming(
&bench::METHOD_BENCHMARK_SERVICE_GENERIC_CALL,
CallOption::default(),
)
.unwrap();
let f = future::loop_fn(
(sender, self, receiver),
move |(sender, mut executor, receiver)| {
let f = async move {
loop {
let latency_timer = Instant::now();
let send = sender.send((executor.req.clone(), WriteFlags::default()));
send.map_err(Error::from).and_then(move |sender| {
receiver
.into_future()
.map_err(|(e, _)| Error::from(e))
.and_then(move |(_, r)| {
executor.ctx.observe_latency(latency_timer.elapsed());
let mut time = executor.ctx.backoff_async();
let mut res = Some((sender, executor, r));
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let r = res.take().unwrap();
let l = if r.1.ctx.keep_running() {
Loop::Continue(r)
} else {
Loop::Break(r)
};
Ok(Async::Ready(l))
})
})
})
},
)
.and_then(|(mut s, e, r)| {
future::poll_fn(move || s.close().map_err(Error::from)).map(|_| (e, r))
})
.and_then(|(e, r)| r.into_future().map(|_| e).map_err(|(e, _)| Error::from(e)));
sender
.send((self.req.clone(), WriteFlags::default()))
.await?;
receiver.try_next().await?;
self.ctx.observe_latency(latency_timer.elapsed());
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
sender.close().await?;
receiver.try_next().await?;
Ok(())
};
spawn!(client, keep_running, "streaming ping pong", f)
}
}
Expand Down Expand Up @@ -228,74 +206,52 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
});
}

fn execute_unary_async(self) {
fn execute_unary_async(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let f = future::loop_fn(self, move |mut executor| {
let latency_timer = Instant::now();
let handler = executor.client.unary_call_async(&executor.req).unwrap();

handler.map_err(Error::from).and_then(move |_| {
let f = async move {
loop {
let latency_timer = Instant::now();
self.client.unary_call_async(&self.req)?.await?;
let elapsed = latency_timer.elapsed();
executor.ctx.observe_latency(elapsed);
let mut time = executor.ctx.backoff_async();
let mut res = Some(executor);
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let executor = res.take().unwrap();
let l = if executor.ctx.keep_running() {
Loop::Continue(executor)
} else {
Loop::Break(())
};
Ok(Async::Ready(l))
})
})
});
self.ctx.observe_latency(elapsed);
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
Ok(())
};
spawn!(client, keep_running, "unary async", f)
}

fn execute_stream_ping_pong(self) {
fn execute_stream_ping_pong(mut self) {
let client = self.client.clone();
let keep_running = self.ctx.keep_running.clone();
let (sender, receiver) = self.client.streaming_call().unwrap();
let f = future::loop_fn(
(sender, self, receiver),
move |(sender, mut executor, receiver)| {
let (mut sender, mut receiver) = self.client.streaming_call().unwrap();
let f = async move {
loop {
let latency_timer = Instant::now();
let send = sender.send((executor.req.clone(), WriteFlags::default()));
send.map_err(Error::from).and_then(move |sender| {
receiver
.into_future()
.map_err(|(e, _)| Error::from(e))
.and_then(move |(_, r)| {
executor.ctx.observe_latency(latency_timer.elapsed());
let mut time = executor.ctx.backoff_async();
let mut res = Some((sender, executor, r));
future::poll_fn(move || {
if let Some(ref mut t) = time {
try_ready!(t.poll());
}
time.take();
let r = res.take().unwrap();
let l = if r.1.ctx.keep_running() {
Loop::Continue(r)
} else {
Loop::Break(r)
};
Ok(Async::Ready(l))
})
})
})
},
)
.and_then(|(mut s, e, r)| {
future::poll_fn(move || s.close().map_err(Error::from)).map(|_| (e, r))
})
.and_then(|(e, r)| r.into_future().map(|_| e).map_err(|(e, _)| Error::from(e)));
sender
.send((self.req.clone(), WriteFlags::default()))
.await?;
receiver.try_next().await?;
self.ctx.observe_latency(latency_timer.elapsed());
let mut time = self.ctx.backoff_async();
if let Some(t) = &mut time {
t.await;
}
if !self.ctx.keep_running() {
break;
}
}
sender.close().await?;
receiver.try_next().await?;
Ok(())
};
spawn!(client, keep_running, "streaming ping pong", f);
}
}
Expand Down Expand Up @@ -390,26 +346,23 @@ impl Client {
his_param.get_resolution(),
his_param.get_max_possible(),
)));
let timer = Timer::default();
let keep_running = Arc::new(AtomicBool::new(true));
let mut running_reqs = Vec::with_capacity(client_channels * outstanding_rpcs_per_channel);

for ch in channels {
for _ in 0..cfg.get_outstanding_rpcs_per_channel() {
let his = his.clone();
let timer = timer.clone();
let ch = ch.clone();
let rx = if load_params.has_poisson() {
let lambda = load_params.get_poisson().get_offered_load()
/ client_channels as f64
/ outstanding_rpcs_per_channel as f64;
let poisson = Poisson::new(lambda);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson, timer);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson);
execute(ctx, ch, client_type, cfg);
rx
} else {
let (ctx, rx) =
ExecutorContext::new(his, keep_running.clone(), ClosedLoop, timer);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), ClosedLoop);
execute(ctx, ch, client_type, cfg);
rx
};
Expand Down Expand Up @@ -442,18 +395,9 @@ impl Client {
stats
}

pub fn shutdown(&mut self) -> BoxFuture<(), Error> {
pub fn shutdown(&mut self) -> impl Future<Output = ()> + Send {
self.keep_running.store(false, Ordering::Relaxed);
let mut tasks = self.running_reqs.take().unwrap();
let mut idx = tasks.len();
Box::new(future::poll_fn(move || {
while idx > 0 {
if let Ok(Async::NotReady) = tasks[idx - 1].poll() {
return Ok(Async::NotReady);
}
idx -= 1;
}
Ok(Async::Ready(()))
}))
let tasks = self.running_reqs.take().unwrap();
futures::future::join_all(tasks).map(|_| ())
}
}