-
Notifications
You must be signed in to change notification settings - Fork 142
TSO happy code path working in progress #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
4530c9e to
25559eb
Compare
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
a69c7f1 to
9c306e7
Compare
src/pd/leader.rs
Outdated
| /* schedule tso request to run */ | ||
| self.schedule(PdTask::TsoRequest); | ||
| } | ||
| Box::new(rx.map_err(Error::Canceled).and_then(move |ts| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can maybe use impl Future as a return type and avoid the box. Reference: https://tokio.rs/docs/going-deeper/returning/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, while I changing it to impl Future I found I eventually still had to add Box. Since we are using Trait to define PdClient interface and impl Trait isn't possible for trait, we still have to use boxed version here.
So shell we have another discussion about using trait here. Looks like testability is the major benefit of using trait, do we want to pay the extra cost for this. Or maybe we can change the trait to expose only synchronous version of methods and leave future methods in the concrete implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hoverbear I pushed another commit to move all Future related interfaces to struct and keep synchronous version in trait. It looks not too bad to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impl Trait isn't possible for trait
Ohhh, so interesting. It is sad impl trait is so limited. :(
I think using impl trait without the Box::new() can help save an allocation, so I try to favor it where possible.
I think my favorite use is in argument position though for accepting iterables and optionals.
use std::collections::HashSet;
fn main() {
println!("{:#?}", unique(vec![123, 1234, 123]).collect::<Vec<u64>>());
optional_arg(None);
optional_arg(Some(1));
optional_arg(1);
}
fn unique<'a>(v: impl IntoIterator<Item=u64>) -> impl Iterator<Item=u64> {
let v = v.into_iter().collect::<HashSet<_>>();
v.into_iter()
}
fn optional_arg(v: impl Into<Option<u64>>) {
let v = v.into().unwrap_or_else(Default::default);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just come up with another idea. To avoid confusing people with identical method name but different return type in trait and struct. We can expose asynchronous interface as different name with boxed suffix for example however return a Boxed future instead.
How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, are we considering adding sync call later on? If so I prefer keep async suffix in method name just like generated gRPC code. Otherwise we can just keep using the method name defined in proto files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if someone wants sync they can just wait() for the values perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hoverbear Updated. Please take another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! I think this is the best. :) I am sorry for asking you to change something and it being misleading.
|
|
||
| fn get_ts(&mut self) -> PdFuture<PdTimestamp> { | ||
| let timer = Instant::now(); | ||
| let (tx, rx) = oneshot::channel::<PdTimestamp>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You asked me to comment on this. :)
It seems creating this channel is non-trivially expensive:
#![feature(test)]
extern crate test;
extern crate futures;
use futures::{Future, sync::oneshot};
use test::Bencher;
use std::time::Instant;
#[bench]
fn create_channel(b: &mut Bencher) {
b.iter(|| {
oneshot::channel::<(i64, i64)>()
});
}
#[bench]
fn create_channel_and_send_recieve(b: &mut Bencher) {
b.iter(|| {
let (tx, rx) = oneshot::channel::<(i64, i64)>();
tx.send((0,0));
rx.wait()
});
}
#[bench]
fn instant(b: &mut Bencher) {
b.iter(|| {
Instant::now()
});
}running 3 tests
test create_channel ... bench: 96 ns/iter (+/- 4)
test create_channel_and_send_recieve ... bench: 141 ns/iter (+/- 9)
test instant ... bench: 17 ns/iter (+/- 2)I was curious and looked at std::sync::atomic::AtomicIsize since PdTimestamp is essentially a (i64, i64), it is predictably much faster:
#[bench]
fn atomics(b: &mut Bencher) {
b.iter(|| {
let values = Arc::new((AtomicIsize::new(0), AtomicIsize::new(0)));
values.0.store(5, Ordering::Relaxed);
values.1.store(6, Ordering::Relaxed);
(values.0.load(Ordering::Relaxed), values.1.load(Ordering::Relaxed));
});
}test atomics ... bench: 25 ns/iter (+/- 3)
However I think we will pay more in other places because synchronization becomes harder. A similar problem happens with Once. Since you have a better context to the code though, maybe you this gives you an idea I don't see. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's strange that I can't find my earlier comments. Let me rewrite it again.
In order to support requests batching and keep the order of responses, I used a dedicated thread to run event loop like what Go pd client does. I haven't figured out other way to implement tso without dispatching thread. Since I already have a thread running I thought it might be good idea to dispatch everything with this thread so there is only one thread interacting with gRPC channel.
This tso streaming thing really bothered me a lot, would like to hear comments from you and everybody else. Let's see if there could be better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Hoverbear Do you have any comment to this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If current implementation is acceptable, I would like to merge this request so I can start a new merge request for new code.
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
e160d48 to
730eef4
Compare
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
Signed-off-by: Xiaoguang Sun <sunxiaoguang@zhihu.com>
|
Great!!! |
Add 'atomic' parameters to the 'new' method
Basic TSO implementation without comprehensive error handling. @Hoverbear