-
Notifications
You must be signed in to change notification settings - Fork 538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement flexible PubSub API #136
Conversation
Originally, pubsub support as only available only by creating a client and then asking for a pubsub API. The original `PubSub` type owned a connection. This neatly avoided the issue that only certain commands are available once calling subscribe or psubscribe; however, it's very unfriendly to connection poolers such as r2d2 This commit makes it so that a borrowed connection may be used for PubSub. Available commands are limited while in pubsub mode by borring the connection mutably into a type that only exposes the allowed commands. This API may be accessed by calling `as_pubsub` on a `Connection`. To go back to being able to use *any* command, this wrapper type may be dropped, and the connection should become usable again for other purposes. Additionally, convenience methods are added to `Connection`: `subscribe` and `psubscribe`. They accept a `ToRedisArgs` and a closure which will be called every time a message is received. The subscription status can be managed via the return value of the closure which is either `ControlFlow::Continue` or `ControlFlow::Break`. This is a breaking change; the original PubSub type was removed in favor of this more flexible API and to avoid code bloat by having multiple implementations or additional abstractions to allow an implementation to be shared. Resolves redis-rs#135.
Test failures were "Connection reset by peer" errors. Maybe just needs to be rerun? |
Is this close to being merged? If not, anything specifically blocking it? |
@dbettin I don't think there's anything blocking it. I wanted to try and fix the tests so it would be green, but that seems like it should be its own effort. We've been running this in production for months now and it works well. Perhaps @mitsuhiko or @badboy can leave a quick review? It's not too large of a PR, but it does have a breaking change for PubSub API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this.
I have a few small nits, but I think this is in a really good stage already.
I don't worry about the API breakage too much. PubSub was noted as "work in progress" and the upgrade path should be simple (we still might want to document that in the Changelog afterwards).
src/commands.rs
Outdated
pub trait PubSubCommands { | ||
/// Subscribe to a list of channels using SUBSCRIBE and run the provided | ||
/// closure for each message received. | ||
fn subscribe<'a, C, F, U>(&mut self, _: C, _: F) -> RedisResult<U> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to explicitly state here that this is blocking until Break
is returned. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I'll add it.
C: ToRedisArgs; | ||
|
||
/// Subscribe to a list of channels using PSUBSCRIBE and run the provided | ||
/// closure for each message received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above.
src/connection.rs
Outdated
impl<'a> Drop for PubSub<'a> { | ||
fn drop(&mut self) { | ||
let _ = cmd("UNSUBSCRIBE").query::<()>(self.con); | ||
let _ = cmd("PUNSUBSCRIBE").query::<()>(self.con); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsubscribing will trigger a message per channel that's unsubscribed.
Will this cause problems for the underlying connection if it is later reused?
Having a test for this would be great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't because query
will already expect to receive a result from the server and parse it. I agree that we should have a test for this including that the connection is no longer subscribed to anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it turns out, a message per channel actually means a distinct message per channel at a protocol level. This will take a bit of work to support. Nice catch!
src/connection.rs
Outdated
let chan = self.get_channel(&channel); | ||
let _: () = try!(cmd("SUBSCRIBE").arg(&*chan).query(&self.con)); | ||
self.channels.insert(chan); | ||
let _: () = try!(cmd("SUBSCRIBE").arg(channel).query(self.con)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're at it, can we switch to the postfix question mark operator here (and below)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, happy to do so. I believe I went with try!
to be consistent with the rest of the code base.
It may not be immediately obvious from the method signature that `PubSubCommands::subscribe` and `PubSubCommands::unsubscribe` block until a message is seen and `ControlFlow::Break` is returned.
When a subscribe or unsubscribe command is sent to the server, the connection enters a state where the only valid commands are pubsub commands. To return to the normal state, all subscriptions must be dropped. This is achieved by sending an unsubscribe and punsubscribe command to the server. The server then replies with all of the subscriptions that were dropped. This patch ensures that all messages are received from the server after sending those commands and that the connection object is again usable for non-pubsub activities. The PubSub API was changed to reflect the fallible nature of this operation. In the event of an error during unsubscribing, the connection is simply lost. In practical terms, this means that entering the PubSub state consumes the connection, and the connection is retrieved through calling `into_inner`. This will likely cause issues with connection pooling libraries since they don't provide ownership.
@badboy I've addressed the issues you pointed to in your review. Please see the description on the final commit for details. One point worth reiterating is that the updated PubSub API now consumes the connection, and it's only returned upon a successful call to
On the flip side, this allows us to guarantee that the connection will be in a usable state when going in and out of PubSub state. |
Reflecting a bit on the updated implementation, the ownership requirements imposed are too extreme, and they are contrary to the goals set out at the start of this PR. However, I have another idea about how to implement this. Instead of having PubSub take ownership of connection, we can return to taking a mutable reference. The connection would itself gain a flag that indicates whether it is in the pubsub state. When entering this state, the flag would be raised. On an unsuccessful exit from this state, the flag would remain. Subsequent commands could check this flag and attempt to exit the pubsub state, or else return an appropriate error. This solution seems to be the best of both worlds -- good ergonomics, connection pooling support, and pubsub constraints can be enforced while making it clear that a connection is not usable by returning an appropriate error on subsequent usage if exiting the pubsub state fails. What do you think? |
@jwilm I haven't yet fully grasped your new proposal, but wanted to throw in a comment first: |
This implementation adds a flag to the `Connection` struct to indicate whether the connection is stuck in the PubSub state after PubSub is dropped. Subsequent commands check this flag and reattempt the unsubscribe routine to put the connection back into a known good state. The main drawback to this implementation is the additional state inside `Connection`.
@badboy the initial implementation could leave the connection in an unusable state if it encounters an error while running the unsubscribe process. I made the changes proposed in my previous comment. It has the same API as the original proposal, but it ensures that we don't accidentally miss an error in the Drop impl and leave the connection in an unknown state. |
I will look into this soon. |
No rush, take your time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small nits, otherwise this tends to look really good.
Got a couple of questions remaining:
- Have you tested this code against r2d2 and see if it can now safely work with those connections in a pool?
- Do we have any chance to test the failure path regarding the new flag? It's probably not easily possible to hit this code path using our current test suite.
src/commands.rs
Outdated
@@ -734,9 +734,108 @@ implement_commands! { | |||
} | |||
} | |||
|
|||
/// Allows pubsub callbacks to stop receiving messages. | |||
/// | |||
/// Arbitraty data may be returned from `Break`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Arbitraty
+ Arbitrary
src/commands.rs
Outdated
Break(U), | ||
} | ||
|
||
/// The PubSub trait allows subscribing to one or more items |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's stick with the Redis wording and call it channels
instead of items.
src/commands.rs
Outdated
/// | ||
/// Each method handles subscribing to the list of keys, waiting for | ||
/// messages, and unsubscribing from the same list of keys once | ||
/// a ControlFlow::Break is encountered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Subscription is to channels, not keys.
src/commands.rs
Outdated
/// // do something with message | ||
/// assert_eq!(msg.get_channel(), Ok(String::from("foo"))); | ||
/// | ||
/// // incerement messages seen counter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- incerement
+ increment
src/commands.rs
Outdated
/// # Ok(()) } | ||
/// ``` | ||
// | ||
// In the future, it would be nice to implement Try such that `?` will work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a TODO:
. Double-slash comments look too similar to doc comments. Making it an explicit TODO helps code readers later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: I'd be curious, did you already try to implement it on nightly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not, but from what I understand about the Try trait, it should be possible.
src/connection.rs
Outdated
} | ||
chan | ||
impl<'a> PubSub<'a> { | ||
pub fn new(con: &'a mut Connection) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose this constructor? I'd favor to only have as_pubsub
on the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, this was a mistake. Nice catch.
} | ||
|
||
impl ConnectionLike for Connection { | ||
fn req_packed_command(&self, cmd: &[u8]) -> RedisResult<Value> { | ||
if self.pubsub.get() { | ||
self.exit_pubsub()?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This now forces an additional check for each command sent, right? I guess the performance implications are still negligible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is an additional check, but I'm guessing the branch predictor will behave favorably here. In our benchmarking of this library at OneSignal, the parser ends up being a much bigger bottleneck than anything else (we have at least 1 PR forthcoming in that regard).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to that PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a PR up for it as of a minute ago.
The only code change here is PubSub::new being made private. This forces users to go through the as_pubsub method on the Connection which was the intended usage anyway.
Feedback addressed; can you take another look? |
Note that I replied to one or two of the comments you left which have now been folded by GitHub. |
Missed your questions
I have not, but given that we have safe guards to put the connection in the correct state, I don't see it as a problem.
Yes. We could mock a bad server response, but it's more work than I'm personally interested in doing right now. |
@badboy is there anything blocking this PR now? |
Just me hitting the big green button. Will do so today. |
Originally, pubsub support as only available only by creating a client
and then asking for a pubsub API. The original
PubSub
type owned aconnection. This neatly avoided the issue that only certain commands are
available once calling subscribe or psubscribe; however, it's very
unfriendly to connection poolers such as r2d2
This commit makes it so that a borrowed connection may be used for
PubSub. Available commands are limited while in pubsub mode by borring
the connection mutably into a type that only exposes the allowed
commands. This API may be accessed by calling
as_pubsub
on aConnection
. To go back to being able to use any command, thiswrapper type may be dropped, and the connection should become usable
again for other purposes.
Additionally, convenience methods are added to
Connection
:subscribe
and
psubscribe
. They accept aToRedisArgs
and a closure which willbe called every time a message is received. The subscription status can
be managed via the return value of the closure which is either
ControlFlow::Continue
orControlFlow::Break
.This is a breaking change; the original PubSub type was removed in favor
of this more flexible API and to avoid code bloat by having multiple
implementations or additional abstractions to allow an implementation to
be shared.
Resolves #135.