Skip to content

Commit

Permalink
Rename unreleased watcher ext methods slightly
Browse files Browse the repository at this point in the history
as suggested by #899 (review)

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed May 11, 2022
1 parent beeeb95 commit c57eadb
Show file tree
Hide file tree
Showing 14 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> {

spawn_periodic_reader(reader); // read from a reader in the background

let mut applied_events = rf.watch_applies().boxed_local();
let mut applied_events = rf.applied_objects().boxed_local();
while let Some(event) = applied_events.try_next().await? {
info!("saw {}", event.name())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/crd_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> {
info!("Current crds: {:?}", crds);
}
});
let mut rfa = rf.watch_applies().boxed();
let mut rfa = rf.applied_objects().boxed();
while let Some(event) = rfa.try_next().await? {
info!("saw {}", event.name());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
let mut items = watcher(api, ListParams::default()).watch_applies().boxed();
let mut items = watcher(api, ListParams::default()).applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
info!("saw {}", p.name());
Expand Down
2 changes: 1 addition & 1 deletion examples/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let ew = watcher(events, lp).watch_applies();
let ew = watcher(events, lp).applied_objects();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl App {
lp = lp.fields(&format!("metadata.name={}", n));
}
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).watch_applies().boxed();
let mut stream = watcher(api, lp).applied_objects().boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
Expand Down
6 changes: 3 additions & 3 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ async fn main() -> anyhow::Result<()> {

// select on applied events from all watchers
let mut combo_stream = stream::select_all(vec![
dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(),
cm_watcher.watch_applies().map_ok(Watched::Config).boxed(),
sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(),
dep_watcher.applied_objects().map_ok(Watched::Deploy).boxed(),
cm_watcher.applied_objects().map_ok(Watched::Config).boxed(),
sec_watcher.applied_objects().map_ok(Watched::Secret).boxed(),
]);
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
Expand Down
2 changes: 1 addition & 1 deletion examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
});

// Drain and log applied events from the reflector
let mut rfa = rf.watch_applies().boxed();
let mut rfa = rf.applied_objects().boxed();
while let Some(event) = rfa.try_next().await? {
info!("saw {}", event.name());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, lp)
.backoff(ExponentialBackoff::default())
.watch_applies();
.applied_objects();

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<Pod>::default_namespaced(client);

watcher(api, ListParams::default())
.watch_applies()
.applied_objects()
.try_for_each(|p| async move {
info!("saw {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
Expand Down
2 changes: 1 addition & 1 deletion examples/secret_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> {
let rf = reflector(store, watcher(secrets, lp));
spawn_periodic_reader(reader); // read from a reader in the background

rf.watch_applies()
rf.applied_objects()
.try_for_each(|s| async move {
info!("saw: {}", s.name());
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ where
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
reflector(writer, watcher(owned_api, lp)).watch_applies(),
reflector(writer, watcher(owned_api, lp)).applied_objects(),
dyntype.clone(),
)
.boxed();
Expand Down Expand Up @@ -532,7 +532,7 @@ where
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(watcher(api, lp).watch_touches(), self.dyntype.clone(), dyntype);
let child_watcher = trigger_owners(watcher(api, lp).touched_objects(), self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}
Expand Down Expand Up @@ -583,7 +583,7 @@ where
I::IntoIter: Send,
Other::DynamicType: Clone,
{
let other_watcher = trigger_with(watcher(api, lp).watch_touches(), move |obj| {
let other_watcher = trigger_with(watcher(api, lp).touched_objects(), move |obj| {
let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
mapper(obj)
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{ready, Stream, TryStream};
use pin_project::pin_project;

#[pin_project]
/// Stream returned by the [`watch_applies`](super::WatchStreamExt::watch_applies) and [`watch_touches`](super::WatchStreamExt::watch_touches) method.
/// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method.
#[must_use = "streams do nothing unless polled"]
pub struct EventFlatten<St, K> {
#[pin]
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::{runtime::Handle, task::JoinHandle};
/// Flattens each item in the list following the rules of [`watcher::Event::into_iter_applied`].
#[deprecated(
since = "0.72.0",
note = "fn replaced with the WatchStreamExt::watch_applies which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_applies()` instead. This function will be removed in 0.75.0."
note = "fn replaced with the WatchStreamExt::applied_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.applied_objects()` instead. This function will be removed in 0.75.0."
)]
pub fn try_flatten_applied<K, S: TryStream<Ok = watcher::Event<K>>>(
stream: S,
Expand All @@ -41,7 +41,7 @@ pub fn try_flatten_applied<K, S: TryStream<Ok = watcher::Event<K>>>(
/// Flattens each item in the list following the rules of [`watcher::Event::into_iter_touched`].
#[deprecated(
since = "0.72.0",
note = "fn replaced with the WatchStreamExt::watch_touches which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_touches()` instead. This function will be removed in 0.75.0."
note = "fn replaced with the WatchStreamExt::touched_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.touched_objects()` instead. This function will be removed in 0.75.0."
)]
pub fn try_flatten_touched<K, S: TryStream<Ok = watcher::Event<K>>>(
stream: S,
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait WatchStreamExt: Stream {
///
/// All Added/Modified events are passed through, and critical errors bubble up.
/// This is functionally equivalent to calling [`try_flatten_applied`] on a [`watcher`].
fn watch_applies<K>(self) -> EventFlatten<Self, K>
fn applied_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
Expand All @@ -32,7 +32,7 @@ pub trait WatchStreamExt: Stream {
///
/// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
/// This is functionally equivalent to calling [`try_flatten_touched`] on a [`watcher`].
fn watch_touches<K>(self) -> EventFlatten<Self, K>
fn touched_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
Expand Down

0 comments on commit c57eadb

Please sign in to comment.