-
Notifications
You must be signed in to change notification settings - Fork 167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change keys method to use Stream #797
Conversation
Getting back to this: I think If we want to have iterator over keys (which we do), we always will have error on subscription, but we can defer errors from calling @caspervonb WDYT? |
@Jarema like I noted in the comment in the other PR, the double error thing happens all over the place in the KV bucket code and in other parts of the jetstream code too. So if we don't want to return a double error, then we should change it everywhere for consistency. However, I think it is ok to do have the double error as each one indicates something entirely different. The first error says "I couldn't even set up this |
Different classes of errors imo, just that we haven't made them concrete at this time so the API doesn't reflect that very well. |
async-nats/src/jetstream/kv/mod.rs
Outdated
/// # Ok(()) | ||
/// # } | ||
/// ``` | ||
pub async fn keys(&self) -> Result<impl futures::Stream<Item = Result<String, Error>>, Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe wrap this in its own type (Keys
) for consistency, can just have an inner History
and have poll_next implement the filtering.
pub async fn keys(&self) -> Result<impl futures::Stream<Item = Result<String, Error>>, Error> { | |
pub async fn keys(&self) -> Result<Keys, Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think I started it and it was a lot of extra code. Can definitely go back and add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now done
3cdc319
to
31bafd4
Compare
keys.insert(entry.key); | ||
} | ||
} | ||
self.stream.delete_consumer(&consumer_name).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that this is no longer required. The server was cleaning up the consumer just fine, but the bug fixed in #824 was actually causing the problem with duplicate consumers
@caspervonb I created the new |
This is an optional commit that I figured I'd do while I was here. The original documentation for this method suggested that it should be returning a `Stream` rather than an iter. So I implemented a stream version of it if desired. This commit can be popped off (or moved to a separate PR) depending on maintainer preference Signed-off-by: Taylor Thomas <taylor@cosmonic.com>
e79d7d0
to
717c6ed
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last looks @Jarema?
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This is a follow up to #792 (I'll rebase once that is merged). Please see this comment thread for additional details around the discussion of changing this API: #792 (comment)