diff --git a/examples/Cargo.toml b/examples/Cargo.toml index fee65399c..25423cad7 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -93,10 +93,6 @@ path = "crd_derive_no_schema.rs" name = "crd_reflector" path = "crd_reflector.rs" -[[example]] -name = "deployment_reflector" -path = "deployment_reflector.rs" - [[example]] name = "dynamic_api" path = "dynamic_api.rs" diff --git a/examples/README.md b/examples/README.md index f3f633ce2..bbc9d98d6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -99,7 +99,7 @@ kubectl apply -f configmapgen_controller_object.yaml and the finalizer example (reconciles a labelled subset of configmaps): ```sh -cargo run --example configmapgen_controller +cargo run --example secret_syncer kubectl apply -f secret_syncer_configmap.yaml kubectl delete -f secret_syncer_configmap.yaml ``` @@ -114,8 +114,6 @@ These examples watch resources as well as ive a store access point: cargo run --example pod_reflector # Watch nodes for applied events and current active nodes cargo run --example node_reflector -# Watch namespace deployments for applied events and current deployments -cargo run --example deployment_reflector # Watch namespaced secrets for applied events and print secret keys in a task cargo run --example secret_reflector # Watch namespaced configmaps for applied events and print store info in task diff --git a/examples/configmap_reflector.rs b/examples/configmap_reflector.rs index 980ade0d0..b00c9a66e 100644 --- a/examples/configmap_reflector.rs +++ b/examples/configmap_reflector.rs @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; use kube::{ api::{Api, ListParams, ResourceExt}, - runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher}, + runtime::{reflector, reflector::Store, watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -32,9 +32,9 @@ async fn main() -> anyhow::Result<()> { spawn_periodic_reader(reader); // read from a reader in the background - let mut applied_events = try_flatten_applied(rf).boxed_local(); + let mut applied_events = rf.watch_applies().boxed_local(); while let Some(event) = applied_events.try_next().await? { - info!("Applied {}", event.name()) + info!("saw {}", event.name()) } Ok(()) } diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index f27afe05b..baa580dfa 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -30,10 +30,6 @@ struct ConfigMapGeneratorSpec { /// Controller triggers this whenever our main object or our children changed async fn reconcile(generator: Arc, ctx: Context) -> Result { - info!("working hard"); - tokio::time::sleep(Duration::from_secs(2)).await; - info!("hard work is done!"); - let client = ctx.get_ref().client.clone(); let mut contents = BTreeMap::new(); diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index 37a9acd3d..e89727bf2 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -4,7 +4,7 @@ use tracing::*; use kube::{ api::{Api, ListParams, Patch, PatchParams, ResourceExt}, - runtime::{reflector, utils::try_flatten_applied, watcher}, + runtime::{reflector, watcher, WatchStreamExt}, Client, CustomResource, CustomResourceExt, }; @@ -48,9 +48,9 @@ async fn main() -> anyhow::Result<()> { info!("Current crds: {:?}", crds); } }); - let mut rfa = try_flatten_applied(rf).boxed(); + let mut rfa = rf.watch_applies().boxed(); while let Some(event) = rfa.try_next().await? { - info!("Applied {}", event.name()); + info!("saw {}", event.name()); } Ok(()) } diff --git a/examples/deployment_reflector.rs b/examples/deployment_reflector.rs deleted file mode 100644 index 96412bdfd..000000000 --- a/examples/deployment_reflector.rs +++ /dev/null @@ -1,42 +0,0 @@ -use futures::{StreamExt, TryStreamExt}; -use k8s_openapi::api::apps::v1::Deployment; -use kube::{ - api::{Api, ListParams, ResourceExt}, - runtime::{reflector, utils::try_flatten_applied, watcher}, - Client, -}; -use tracing::*; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let client = Client::try_default().await?; - - let store = reflector::store::Writer::::default(); - let reader = store.as_reader(); - let rf = reflector( - store, - watcher( - Api::::default_namespaced(client), - ListParams::default().timeout(10), // short watch timeout in this example - ), - ); - - // We can interact with state in another thread - tokio::spawn(async move { - loop { - // Periodically read our state - let deploys: Vec<_> = reader.state().iter().map(|r| r.name()).collect(); - info!("Current deploys: {:?}", deploys); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - } - }); - - // We can look at the events we want and use it as a watcher - let mut rfa = try_flatten_applied(rf).boxed(); - while let Some(event) = rfa.try_next().await? { - info!("Applied {}", event.name()); - } - - Ok(()) -} diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index b8f9aa688..3fb493eb5 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -1,8 +1,8 @@ -use futures::prelude::*; +use futures::{StreamExt, TryStreamExt}; use kube::{ api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt}, - discovery, - runtime::{utils::try_flatten_applied, watcher}, + discovery::{self, Scope}, + runtime::{watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -22,17 +22,19 @@ async fn main() -> anyhow::Result<()> { // Turn them into a GVK let gvk = GroupVersionKind::gvk(&group, &version, &kind); // Use API discovery to identify more information about the type (like its plural) - let (ar, _caps) = discovery::pinned_kind(&client, &gvk).await?; + let (ar, caps) = discovery::pinned_kind(&client, &gvk).await?; - // Use the discovered kind in an Api with the ApiResource as its DynamicType + // Use the full resource info to create an Api with the ApiResource as its DynamicType let api = Api::::all_with(client, &ar); // Fully compatible with kube-runtime - try_flatten_applied(watcher(api, ListParams::default())) - .try_for_each(|p| async move { - info!("Applied: {}", p.name()); - Ok(()) - }) - .await?; + let mut items = watcher(api, ListParams::default()).watch_applies().boxed(); + while let Some(p) = items.try_next().await? { + if caps.scope == Scope::Cluster { + info!("saw {}", p.name()); + } else { + info!("saw {} in {}", p.name(), p.namespace().unwrap()); + } + } Ok(()) } diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index 1b0a9e518..57714abed 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -2,7 +2,7 @@ use futures::{pin_mut, TryStreamExt}; use k8s_openapi::api::core::v1::Event; use kube::{ api::{Api, ListParams}, - runtime::{utils::try_flatten_applied, watcher}, + runtime::{watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> { let events: Api = Api::all(client); let lp = ListParams::default(); - let ew = try_flatten_applied(watcher(events, lp)); + let ew = watcher(events, lp).watch_applies(); pin_mut!(ew); while let Some(event) = ew.try_next().await? { @@ -27,8 +27,8 @@ async fn main() -> anyhow::Result<()> { // This function lets the app handle an added/modified event from k8s fn handle_event(ev: Event) -> anyhow::Result<()> { info!( - "New Event: {} (via {} {})", - ev.message.unwrap(), + "Event: \"{}\" via {} {}", + ev.message.unwrap().trim(), ev.involved_object.kind.unwrap(), ev.involved_object.name.unwrap() ); diff --git a/examples/kubectl.rs b/examples/kubectl.rs index 51552f8fd..be2ac077b 100644 --- a/examples/kubectl.rs +++ b/examples/kubectl.rs @@ -12,9 +12,8 @@ use kube::{ core::GroupVersionKind, discovery::{ApiCapabilities, ApiResource, Discovery, Scope}, runtime::{ - utils::try_flatten_applied, wait::{await_condition, conditions::is_deleted}, - watcher, + watcher, WatchStreamExt, }, Client, }; @@ -118,10 +117,8 @@ impl App { if let Some(n) = &self.name { lp = lp.fields(&format!("metadata.name={}", n)); } - let w = watcher(api, lp); - - // present a dumb table for it for now. maybe drop the whole watch. kubectl does not do it anymore. - let mut stream = try_flatten_applied(w).boxed(); + // present a dumb table for it for now. kubectl does not do this anymore. + let mut stream = watcher(api, lp).watch_applies().boxed(); println!("{0: anyhow::Result<()> { // select on applied events from all watchers let mut combo_stream = stream::select_all(vec![ - try_flatten_applied(dep_watcher).map_ok(Watched::Deploy).boxed(), - try_flatten_applied(cm_watcher).map_ok(Watched::Config).boxed(), - try_flatten_applied(sec_watcher).map_ok(Watched::Secret).boxed(), + dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(), + cm_watcher.watch_applies().map_ok(Watched::Config).boxed(), + sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(), ]); // SelectAll Stream elements must have the same Item, so all packed in this: #[allow(clippy::large_enum_variant)] diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 719c91ac0..21fa4b850 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::Node; use kube::{ api::{Api, ListParams, ResourceExt}, - runtime::{reflector, utils::try_flatten_applied, watcher}, + runtime::{reflector, watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> { let nodes: Api = Api::all(client.clone()); let lp = ListParams::default() - .labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label + .labels("kubernetes.io/arch=amd64") // filter instances by label .timeout(10); // short watch timeout in this example let store = reflector::store::Writer::::default(); @@ -31,9 +31,9 @@ async fn main() -> anyhow::Result<()> { }); // Drain and log applied events from the reflector - let mut rfa = try_flatten_applied(rf).boxed(); + let mut rfa = rf.watch_applies().boxed(); while let Some(event) = rfa.try_next().await? { - info!("Applied {}", event.name()); + info!("saw {}", event.name()); } Ok(()) diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index 7633bcce5..10e25be8c 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -3,10 +3,7 @@ use futures::{pin_mut, TryStreamExt}; use k8s_openapi::api::core::v1::{Event, Node}; use kube::{ api::{Api, ListParams, ResourceExt}, - runtime::{ - utils::{try_flatten_applied, StreamBackoff}, - watcher, - }, + runtime::{watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -18,10 +15,10 @@ async fn main() -> anyhow::Result<()> { let events: Api = Api::all(client.clone()); let nodes: Api = Api::all(client.clone()); - let obs = try_flatten_applied(StreamBackoff::new( - watcher(nodes, ListParams::default().labels("beta.kubernetes.io/os=linux")), - ExponentialBackoff::default(), - )); + let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64"); + let obs = watcher(nodes, lp) + .backoff(ExponentialBackoff::default()) + .watch_applies(); pin_mut!(obs); while let Some(n) = obs.try_next().await? { @@ -57,8 +54,7 @@ async fn check_for_node_failures(events: &Api, o: Node) -> anyhow::Result warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?); } } else { - // Turn node_watcher=debug in log to see all - debug!("Healthy node: {}", name); + info!("Healthy node: {}", name); } Ok(()) } diff --git a/examples/pod_watcher.rs b/examples/pod_watcher.rs index d64f8467a..a5023690b 100644 --- a/examples/pod_watcher.rs +++ b/examples/pod_watcher.rs @@ -2,7 +2,7 @@ use futures::prelude::*; use k8s_openapi::api::core::v1::Pod; use kube::{ api::{Api, ListParams, ResourceExt}, - runtime::{utils::try_flatten_applied, watcher}, + runtime::{watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -13,9 +13,10 @@ async fn main() -> anyhow::Result<()> { let client = Client::try_default().await?; let api = Api::::default_namespaced(client); - try_flatten_applied(watcher(api, ListParams::default())) + watcher(api, ListParams::default()) + .watch_applies() .try_for_each(|p| async move { - debug!("Applied: {}", p.name()); + info!("saw {}", p.name()); if let Some(unready_reason) = pod_unready(&p) { warn!("{}", unready_reason); } diff --git a/examples/secret_reflector.rs b/examples/secret_reflector.rs index ac04e0bd0..13039c6fa 100644 --- a/examples/secret_reflector.rs +++ b/examples/secret_reflector.rs @@ -2,7 +2,7 @@ use futures::TryStreamExt; use k8s_openapi::api::core::v1::Secret; use kube::{ api::{Api, ListParams, ResourceExt}, - runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher}, + runtime::{reflector, reflector::Store, watcher, WatchStreamExt}, Client, }; use std::collections::BTreeMap; @@ -60,9 +60,9 @@ async fn main() -> anyhow::Result<()> { let rf = reflector(store, watcher(secrets, lp)); spawn_periodic_reader(reader); // read from a reader in the background - try_flatten_applied(rf) + rf.watch_applies() .try_for_each(|s| async move { - info!("Applied: {}", s.name()); + info!("saw: {}", s.name()); Ok(()) }) .await?; diff --git a/examples/secret_syncer.rs b/examples/secret_syncer.rs index 2c4076cd4..78255b676 100644 --- a/examples/secret_syncer.rs +++ b/examples/secret_syncer.rs @@ -31,10 +31,8 @@ enum Error { type Result = std::result::Result; fn secret_name_for_configmap(cm: &ConfigMap) -> Result { - Ok(format!( - "cm---{}", - cm.metadata.name.as_deref().ok_or(Error::NoName)? - )) + let name = cm.metadata.name.as_deref().ok_or(Error::NoName)?; + Ok(format!("cmsyncer-{}", name)) } async fn apply(cm: Arc, secrets: &kube::Api) -> Result { @@ -77,17 +75,16 @@ async fn cleanup(cm: Arc, secrets: &kube::Api) -> Result anyhow::Result<()> { tracing_subscriber::fmt::init(); - let kube = kube::Client::try_default().await?; - let all_cms = kube::Api::::all(kube.clone()); + let client = kube::Client::try_default().await?; Controller::new( - all_cms, + Api::::all(client.clone()), ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"), ) .run( |cm, _| { let ns = cm.meta().namespace.as_deref().ok_or(Error::NoNamespace).unwrap(); - let cms: Api = Api::namespaced(kube.clone(), ns); - let secrets: Api = Api::namespaced(kube.clone(), ns); + let cms: Api = Api::namespaced(client.clone(), ns); + let secrets: Api = Api::namespaced(client.clone(), ns); async move { finalizer( &cms, diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index d3d4e0a3a..995a0077e 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -8,10 +8,7 @@ use crate::{ ObjectRef, }, scheduler::{scheduler, ScheduleRequest}, - utils::{ - try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle, - KubeRuntimeStreamExt, StreamBackoff, - }, + utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, watcher::{self, watcher}, }; use backoff::backoff::Backoff; @@ -465,7 +462,7 @@ where let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); let self_watcher = trigger_self( - try_flatten_applied(reflector(writer, watcher(owned_api, lp))), + reflector(writer, watcher(owned_api, lp)).watch_applies(), dyntype.clone(), ) .boxed(); @@ -535,11 +532,7 @@ where where Child::DynamicType: Debug + Eq + Hash + Clone, { - let child_watcher = trigger_owners( - try_flatten_touched(watcher(api, lp)), - self.dyntype.clone(), - dyntype, - ); + let child_watcher = trigger_owners(watcher(api, lp).watch_touches(), self.dyntype.clone(), dyntype); self.trigger_selector.push(child_watcher.boxed()); self } @@ -590,7 +583,7 @@ where I::IntoIter: Send, Other::DynamicType: Clone, { - let other_watcher = trigger_with(try_flatten_touched(watcher(api, lp)), move |obj| { + let other_watcher = trigger_with(watcher(api, lp).watch_touches(), move |obj| { let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); mapper(obj) .into_iter() diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 992760eb2..a0db4f44a 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -32,4 +32,5 @@ pub use controller::{applier, Controller}; pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; +pub use utils::WatchStreamExt; pub use watcher::watcher; diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_flatten.rs new file mode 100644 index 000000000..51914c90c --- /dev/null +++ b/kube-runtime/src/utils/event_flatten.rs @@ -0,0 +1,96 @@ +use crate::watcher::{Error, Event}; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use futures::{ready, Stream, TryStream}; +use pin_project::pin_project; + +#[pin_project] +/// Stream returned by the [`watch_applies`](super::WatchStreamExt::watch_applies) and [`watch_touches`](super::WatchStreamExt::watch_touches) method. +#[must_use = "streams do nothing unless polled"] +pub struct EventFlatten { + #[pin] + stream: St, + emit_deleted: bool, + queue: std::vec::IntoIter, +} +impl>, K> EventFlatten { + pub(super) fn new(stream: St, emit_deleted: bool) -> Self { + Self { + stream, + queue: vec![].into_iter(), + emit_deleted, + } + } +} +impl Stream for EventFlatten +where + St: Stream, Error>>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + Poll::Ready(loop { + if let Some(item) = me.queue.next() { + break Some(Ok(item)); + } + break match ready!(me.stream.as_mut().poll_next(cx)) { + Some(Ok(Event::Applied(obj))) => Some(Ok(obj)), + Some(Ok(Event::Deleted(obj))) => { + if *me.emit_deleted { + Some(Ok(obj)) + } else { + continue; + } + } + Some(Ok(Event::Restarted(objs))) => { + *me.queue = objs.into_iter(); + continue; + } + Some(Err(err)) => Some(Err(err)), + None => return Poll::Ready(None), + }; + }) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use std::task::Poll; + + use super::{Error, Event, EventFlatten}; + use futures::{pin_mut, poll, stream, StreamExt}; + + #[tokio::test] + async fn watches_applies_uses_correct_eventflattened_stream() { + let data = stream::iter([ + Ok(Event::Applied(0)), + Ok(Event::Applied(1)), + Ok(Event::Deleted(0)), + Ok(Event::Applied(2)), + Ok(Event::Restarted(vec![1, 2])), + Err(Error::TooManyObjects), + Ok(Event::Applied(2)), + ]); + let rx = EventFlatten::new(data, false); + pin_mut!(rx); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); + // NB: no Deleted events here + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + // Restart comes through, currently in reverse order + // (normally on restart they just come in alphabetical order by name) + // this is fine though, alphabetical event order has no functional meaning in watchers + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + // Error passed through + assert!(matches!( + poll!(rx.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(None))); + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 669c497b7..5c3e52a3a 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -1,10 +1,13 @@ //! Helpers for manipulating built-in streams mod backoff_reset_timer; +mod event_flatten; mod stream_backoff; +mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; pub use stream_backoff::StreamBackoff; +pub use watch_ext::WatchStreamExt; use crate::watcher; use futures::{ @@ -23,6 +26,10 @@ use stream::IntoStream; use tokio::{runtime::Handle, task::JoinHandle}; /// Flattens each item in the list following the rules of [`watcher::Event::into_iter_applied`]. +#[deprecated( + since = "0.72.0", + note = "fn replaced with the WatchStreamExt::watch_applies which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_applies()` instead. This function will be removed in 0.75.0." +)] pub fn try_flatten_applied>>( stream: S, ) -> impl Stream> { @@ -32,6 +39,10 @@ pub fn try_flatten_applied>>( } /// Flattens each item in the list following the rules of [`watcher::Event::into_iter_touched`]. +#[deprecated( + since = "0.72.0", + note = "fn replaced with the WatchStreamExt::watch_touches which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_touches()` instead. This function will be removed in 0.75.0." +)] pub fn try_flatten_touched>>( stream: S, ) -> impl Stream> { diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs new file mode 100644 index 000000000..4a43f33fc --- /dev/null +++ b/kube-runtime/src/utils/watch_ext.rs @@ -0,0 +1,42 @@ +use crate::{ + utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff}, + watcher, +}; +use backoff::backoff::Backoff; + +use futures::{Stream, TryStream}; + +/// Extension trait for streams returned by [`watcher`] or [`reflector`] +pub trait WatchStreamExt: Stream { + /// Apply a [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] + fn backoff(self, b: B) -> StreamBackoff + where + B: Backoff, + Self: TryStream + Sized, + { + StreamBackoff::new(self, b) + } + + /// Flatten a [`watcher`] stream into a stream of applied objects + /// + /// All Added/Modified events are passed through, and critical errors bubble up. + /// This is functionally equivalent to calling [`try_flatten_applied`] on a [`watcher`]. + fn watch_applies(self) -> EventFlatten + where + Self: Stream, watcher::Error>> + Sized, + { + EventFlatten::new(self, false) + } + + /// Flatten a [`watcher`] stream into a stream of touched objects + /// + /// All Added/Modified/Deleted events are passed through, and critical errors bubble up. + /// This is functionally equivalent to calling [`try_flatten_touched`] on a [`watcher`]. + fn watch_touches(self) -> EventFlatten + where + Self: Stream, watcher::Error>> + Sized, + { + EventFlatten::new(self, true) + } +} +impl WatchStreamExt for St where St: Stream {}