Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,39 @@ async fn step<K: Meta + Clone + DeserializeOwned + Send + 'static>(
}
}

/// Watches a Kubernetes Resource for changes
/// Watches a Kubernetes Resource for changes continuously
///
/// Errors are propagated to the client as `Err`. Tries to recover (by reconnecting and resyncing as required)
/// if polled again after an error.
/// Creates an indefinite read stream through continual [`Api::watch`] calls, and keeping track
/// of [returned resource versions](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes).
/// It tries to recover (by reconnecting and resyncing as required) if polled again after an error.
/// However, keep in mind that most terminal `TryStream` combinators (such as `TryFutureExt::try_for_each`
/// and `TryFutureExt::try_concat` will terminate eagerly if an `Error` reaches them.
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`],
/// direct users may want to flatten composite events with [`try_flatten_applied`]:
///
/// ```no_run
/// use kube::{api::{Api, ListParams, Meta}, Client};
/// use kube_runtime::{utils::try_flatten_applied, watcher};
/// use k8s_openapi::api::core::v1::Pod;
/// use futures::{StreamExt, TryStreamExt};
/// #[tokio::main]
/// async fn main() -> Result<(), kube_runtime::watcher::Error> {
/// let client = Client::try_default().await.unwrap();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
/// let watcher = watcher(pods, ListParams::default());
/// try_flatten_applied(watcher)
/// .try_for_each(|p| async move {
/// println!("Applied: {}", Meta::name(&p));
/// Ok(())
/// })
/// .await?;
/// Ok(())
/// }
/// ```
/// [`try_flatten_applied`]: super::utils::try_flatten_applied
/// [`reflector`]: super::reflector::reflector
/// [`Api::watch`]: https://docs.rs/kube/*/kube/struct.Api.html#method.watch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought rustdoc was supposed to resolve these links automagically these days

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to get a cross-crate link to work :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to work for tokio-util (https://docs.rs/tokio-util/0.6.0/src/tokio_util/context.rs.html#22).. which direction did you test in? Makes sense that it would be tricker for links from kube -> kube-runtime than the other way around.

Copy link
Member Author

@clux clux Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have the tokio::runtime::Runtime in scope 7 lines up. If we ever re-export kube_runtime from kube under a runtime feature (which I think we should aim to do), then within crate links will work from kube -> kube_runtime.

The other way around has the same problem for the time being.

///
/// # Migration from `kube::runtime`
///
Expand Down
8 changes: 8 additions & 0 deletions kube/src/api/typed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ where
/// This returns a future that awaits the initial response,
/// then you can stream the remaining buffered `WatchEvent` objects.
///
/// Note that a `watch` call can terminate for many reasons (even before the specified
/// [`ListParams::timeout`] is triggered), and will have to be re-issued
/// with the last seen resource version when or if it closes.
///
/// Consider using a managed [`watcher`] to deal with automatic re-watches and error cases.
///
/// ```no_run
/// use kube::{api::{Api, ListParams, Meta, WatchEvent}, Client};
/// use k8s_openapi::api::batch::v1::Job;
Expand All @@ -317,6 +323,8 @@ where
/// Ok(())
/// }
/// ```
/// [`ListParams::timeout`]: super::ListParams::timeout
/// [`watcher`]: https://docs.rs/kube_runtime/*/kube_runtime/watcher/fn.watcher.html
pub async fn watch(
&self,
lp: &ListParams,
Expand Down