-
-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache: implement a builder wrapper for reflector #698
Conversation
moves the imports of kube_runtime reflector around a bit first as per comments in #577 then implements a slightly more ergonomic builder struct `Cache<K>` that wraps `reflector`. This encapsulates the writer and result unpacking within the struct, and creates 3 ways to consume it: - `Cache::run(self)` - if wanting to run forever - `Cache::applies(self)` - also exposes flattened applies stream - `Cache::touches(self)` - also exposes flattened touches stream Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
kube-runtime/src/cache/mod.rs
Outdated
@@ -25,6 +29,87 @@ where | |||
stream.inspect_ok(move |event| store.apply_watcher_event(event)) | |||
} | |||
|
|||
/// A simple reflector cache around a store and an owned watcher | |||
pub struct Cache<K> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know, I know we have an education issue, but this feels like it singles out the reflector/cache layer too much for my taste.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me Cache
sounds like a synonym of Store
, not what's effectively a builder for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I'm 👍 on renaming the module to cache
(and maybe renaming Store
to Cache
?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not attached to names here. Happy to let Store
take that name, that seems reasonable to me.
but this feels like it singles out the reflector/cache layer too much for my taste
Yeah, it's not my intention to just single out the reflector
here. I was also hoping to do something to watcher
for the same error issue you raised + the ergonomics issue, but I thought I'd start with this as a place to start and see what you thought.
We could potentially wrap both with one struct: if users call (bad idea upon reflection: impossible to name a thing that does both depending on flags)Cache::store()
we could record events, otherwise we could forward a watcher
's events here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have renamed it to Reflector
for now, but going to park this for a little bit. The watcher
+ backoff
integration needs to be attempted before this so that we can figure out how to best expose that through something higher level. If the watcher
gets a third parameter for this, then even more a case for a helper wrappers on top of it. Will continue after attempting that.
kube-runtime/src/cache/mod.rs
Outdated
/// The [`ListParams`] controls to the possible subset of objects of `K` that you want to cache. | ||
/// For the full set of objects `K` in the given `Api` scope, you can use [`ListParams::default`]. | ||
#[must_use] | ||
pub fn new(api: Api<K>, lp: ListParams) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like this can just be moved into the ìmpl
block below with a fn
-level where
clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how to do that; the writer requires an explicit dyntime
.
kube-runtime/src/cache/mod.rs
Outdated
/// | ||
/// This should be awaited forever. | ||
#[must_use] | ||
pub fn run(self) -> BoxFuture<'static, ()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this Future
boxed rather than just making it an async fn
(or returning an impl Future
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this actually something meaningful that users want? Using the cache without listening for updates is typically indicative of a bug IME.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the time I use a reflector directly, I just want an in-memory, eventually consistent db and not the underlying events. Generally, this would be for something akin to version-rs, i.e. apps that gets their state from kubernetes properties on some data type.
Got one that is just responsible for tracking annotations on pods and doing something unrelated that just happens to need the values of those annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed pointless boxing in 6220526 . Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the main concern here that we don't catch errors by doing this? I realised that if there are legit errors then throwing it away here is equally problematic. I have changed this .run()
method to return the first error event from the watcher in d3473ac . Do you think that's enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious about the use-cases here, are any of those uses FOSS, or are you able to go into deeper detail on them?
Got one that is just responsible for tracking annotations on pods and doing something unrelated that just happens to need the values of those annotations.
Wouldn't that typically mean that you'd want the upstream to update when those annotations change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That aside, even when I have wanted an eventually consistent db for one-off reads (in my case, when writing a load balancer) I've usually found Store
a pretty poor fit and built my own "db" on top, since Store
doesn't allow any kind of indexing other than by-name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that typically mean that you'd want the upstream to update when those annotations change?
The app in question is handling to alertmanager webhooks, and uses pod annotations as a source for who owns the app for incoming alerts. So there's nothing to do inside kubernetes with the information.
Store
could definitely benefit from ways to find entries, but the .get(objectref)
method works well if you know the name + namespace. Plus you can make the memory usage pretty small if you write your own serialization struct that discards properties you don't care about.
kube-runtime/src/cache/mod.rs
Outdated
/// | ||
/// Note that the returned stream is always reflected in the [`reader`](Cache::reader). | ||
/// If you do not require a reader, prefer using a [`watcher`] directly. | ||
pub fn applies(self) -> impl Stream<Item = K> + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name feels pretty awkward.. applied_objects
perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this feels like it's encouraging people to use reflectors even if they don't need them. I'd rather focus on how we can make try_flatten_*
feel more ergonomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the utils
are fine by themselves (except possibly naming), I think the problem is that the users have to interact with them. Users generally just want a stream of objects and have a safe default. If we force people to use these utils
on top of watcher::Events
, then we have to teach everyone about the intricacies/failings of the watch api (we keep getting these questions), and I don't think we need to. The abstraction leaks out all the internals and it's too complicated for people who just want to write something quick that just works.
The way I see it is that watcher::Events
should be an implementation detail for reflector
to do atomic cache transitions, and users should not be exposed to them unless they go looking for something advanced.
kube-runtime/src/cache/mod.rs
Outdated
/// Note that the returned stream is always reflected in the [`reader`](Cache::reader). | ||
/// If you do not require a reader, prefer using a [`watcher`] directly. | ||
pub fn applies(self) -> impl Stream<Item = K> + Send { | ||
utils::try_flatten_applied(self.cache).filter_map(|x| async move { x.ok() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want to just discard all errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we advertise the reflector
/ watcher
as a thing you can just run forever and it should deal with the issues, so I think this plays into how we want to handle #577 . The current watcher
and reflector
strategy of just returning a TryStream
and pretending that the user actually does any greater error handling than inserting a ?
at least crashes the app on errors so that kubernetes can do the backoff itself.
If we instead take a backoff
object as a parameter to this struct, then this would be a place to catch errors and restart the watcher. However, by simply restarting (with some backoff within this struct here), we create the potential for a reflector to never initialize in the first place, or start failing continuously from some point (maybe after some rbac changes), and the app would not really know.
I guess my question is, if we are trying to handle #577 ourselves, when/how do we decide to call it quits?
Edit: speculated a bit under flawed assumptions here. Continuing discussions in #577
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, wait. I think I misunderstood you. You want the backoff
to be baked into the watcher
itself for this particular error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Carrying on discussions in #577, but now agree that we should not discard errors here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors are propagated again in d3473ac .
}; | ||
use serde::de::DeserializeOwned; | ||
use std::{fmt::Debug, hash::Hash}; | ||
pub use store::{Store, Writer}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name here is store::Writer
, not Writer
. If we absolutely want to hide store
then it should probably be reexported as StoreWriter
instead.
Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
Signed-off-by: clux <sszynrae@gmail.com>
- move reflector invocation to where the consumption happens - rename applies -> watch_applies - rename touches -> watch_modifies still in experiment stage. wondering if it's best to just always return a BoxStream in the event listening consumers (avoids importing StreamExt when users inevitably box themselves - unless they use combinators). want to do a thing for watchers as well, once the backoff stuff gets attempted Signed-off-by: clux <sszynrae@gmail.com>
* Add a WatchStreamExt trait for better event chaining From discussion with @teozkr after #698 . This tries to solve the problem in a more generic way using Stream helpers. It was not trivial, and I'm not convinced this is the easiest way to do it, but every other path i tried failed. Signed-off-by: clux <sszynrae@gmail.com> * some slightly better docs and tests Signed-off-by: clux <sszynrae@gmail.com> * move eventflatten to own module to simplify ext Signed-off-by: clux <sszynrae@gmail.com> * update kubectl example to use WatchStreamExt Signed-off-by: clux <sszynrae@gmail.com> * convert watcher examples Signed-off-by: clux <sszynrae@gmail.com> * avoid re-importing futures::ready Signed-off-by: clux <sszynrae@gmail.com> * s/delete/emit_delete Signed-off-by: clux <sszynrae@gmail.com> * remove bad example comment Signed-off-by: clux <sszynrae@gmail.com> * use teo's rewrite queue Signed-off-by: clux <sszynrae@gmail.com> * update examples to all use WatchStreamExt Signed-off-by: clux <sszynrae@gmail.com> * add deprecation warning on try_flatten_* and remove internal use Signed-off-by: clux <sszynrae@gmail.com> * quick test of controllers with some minor tweaks Signed-off-by: clux <sszynrae@gmail.com>
Closing, replaced by #899 |
First moves the imports of
kube_runtime
reflector around a bit first as per comments in #577 (#577 (comment)):runtime::reflector
module renamed toruntime::cache
modulestore
module withincache
made private, but re-exportingWriter
(as well) from the moduleThen we add a slightly more ergonomic builder struct
Cache<K>
that wrapsreflector
.This encapsulates the
Writer
and result unpacking within the struct so that the users need to import less, and generally has a less confusing time browsing docs between 3 modules.The
Cache
has a just a ctor, a getter for the store, and 3 ways to run + consume it:Cache::run(self)
- if wanting to run foreverCache::applies(self)
- also exposes flattened applies streamCache::touches(self)
- also exposes flattened touches streamTODO: (self) expose a helper to determine if the InitListed event has happened (have passed through one event)