-
Notifications
You must be signed in to change notification settings - Fork 556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow the async connection to have multiple in flight requests #143
Conversation
ce80ce1
to
41a4406
Compare
5ca319e
to
21a1b00
Compare
Rebased on top of #141 so that it works without tokio-uds https://github.com/mitsuhiko/redis-rs/pull/143/files/83e82b87702f3161dc78b11aac514756fae3e7f2..21a1b0007ccaa6acae18a5323c8dbeb32c7de6b4 . Also changed |
cb16409
to
601fd3c
Compare
9a7d88a
to
bcb0cff
Compare
This prevents us from doing small writes when there are multiple messages already ready to be send through the pipeline.
Version 0.2 were released a few weeks ago so we might as well enable it immediately
@badboy Rebased this on top of master. Non-rustfmt changes are at https://github.com/mitsuhiko/redis-rs/pull/143/files/6e8492be1214d831a73628ebdf5d7f2398b14729..a3eefd4b35677df42a05b879388d90c388043c79. I find this PR makes it significantly nicer to work with the async API so I think it would be nice to get in as well before making any release :) (I could maybe extract this outside of this crate as well though so it is not as critical I think). |
Lined it up for review, planning to get to it by the end of the week. |
Could you rebase this? |
@badboy I merged from master to avoid breaking the git dependency I have, can rebase this before merging (or just squash and merge, none of the commits document anything important) |
Works for me! |
@badboy Have you had the time to look at this yet? |
Unfortunately no. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey!
First of all, thanks for patiently waiting for my long-overdue review. This work is really appreciated.
Now I just want to make sure we actually get good code and long-term maintainable (I have to admit I still haven't actually used futures too much).
I already left some small comments inline.
Now to the bigger picture:
I have some trouble following the intention of the code, especially with things like VecDeque<Vec<oneshot::Sender<Result<T::Item, T::Error>>>>
and Vec<oneshot::Sender<Result<T::Item, T::Error>>>
.
Why do we need nested Vec(Deque)s here? Why do we reverse the inner Vec later and pop items off? Is this effectively emulating an ordered stream?
Can you give me a short high-level overview of what the code achieves? I think it then might make sense to include such documentation with the code as well.
Thanks again for your work, it would be great to have these things in the code base.
Cargo.toml
Outdated
@@ -25,11 +25,17 @@ with-system-unix-sockets = [] | |||
sha1 = ">= 0.2, < 0.7" | |||
url = "1.2" | |||
rustc-serialize = { version = "0.3.16", optional = true } | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these newlines. Also some below.
.unwrap_or(&0)); | ||
println!( | ||
" max-intset: {}", | ||
config.get("set-max-intset-entries").unwrap_or(&0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not too happy about this formatting, but I'm ok with accepting it in this patch. I wonder if we can tell rustfmt to keep the string on the same line as println!
(can be done later)
}; | ||
self.send_result(item); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a newline
src/async.rs
Outdated
match sender.send(item) { | ||
Ok(_) => (), | ||
Err(_) => { | ||
// `Err` means that the receiver were dropped in which case it does not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the receiver was dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
src/async.rs
Outdated
T::Error: ::std::fmt::Debug, | ||
{ | ||
fn new(sink_stream: T) -> Self { | ||
let (sender, receiver) = mpsc::unbounded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded sounds scary.
Do we have any other limitations to ensure we don't actually grow unbounded?
Or is there any way we could actually have a bounded channel (for some reasonable buffer size?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, 50 was chosen by a fair dice roll. I think in the long term this could be implemented with https://github.com/tower-rs/tower and we could get this (and other niceties) for free.
src/async.rs
Outdated
Box::new( | ||
self_ | ||
.send((item, senders)) | ||
.map_err(|_| panic!("Unexpected close of redis mpsc sender")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panics in the middle of code always make me worry.
Under what circumstances will this be hit? How easy is this to trigger?
Is this really unrecoverable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. It was supposed to be unreachable barring programming bugs but it ended up being reachable and should have been mapped to a closed connection. Fixed
src/async.rs
Outdated
let pipeline = match con.con { | ||
ActualConnection::Tcp(tcp) => ActualPipeline::Tcp(Pipeline::new( | ||
ValueCodec::default().framed(tcp.into_inner()), | ||
)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The formatting here is a bit weird.
Can we just move the ActualPipeline
part into a block on the next line?
src/async.rs
Outdated
let self_ = self.0.clone(); | ||
|
||
let mut senders = Vec::new(); | ||
let mut receivers = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do know the size here, so Vec::with_capacity()
seems like a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Thought about using https://github.com/servo/rust-smallvec here since 1 element vectors will be common. Seems like premature optimization though.
By reversing the senders vector before it is sent we don't need the extra intermediate step
No actual reason to use an unbounded channel here and its better to have some sort of backpressure. Ideally this should be configurable and this would also apply to the messages currently in flight on the `Sink + Stream` (perhaps with https://github.com/tower-rs/tower if it gets support for this)
I hope I added enough context to the pipeline stuff so it is understandable! |
@Marwes Much better! I give it another look |
I'm sorry it took me another 2 weeks to take a look at it. But I think it's ready to merge now. |
Based on #141 (see 8fba6cf)
This is an extension of #141 where the reading and writing to the socket is done in a separate future which understands how to pipeline requests. This makes it possible to
Clone
theSharedConnection
and do multiple queries concurrently, without any manual pipelining.The downsides are that the connection is notSend
orSync
(could be alleviated with sync types but I am not sure it is worth it) and there is not (currently) a way to run explicit pipelines.The connection is now
Send + Sync