Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a WatchStreamExt trait for stream chaining #899

Merged
merged 13 commits into from
May 11, 2022
4 changes: 0 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ path = "crd_derive_no_schema.rs"
name = "crd_reflector"
path = "crd_reflector.rs"

[[example]]
name = "deployment_reflector"
path = "deployment_reflector.rs"

[[example]]
name = "dynamic_api"
path = "dynamic_api.rs"
Expand Down
4 changes: 1 addition & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ kubectl apply -f configmapgen_controller_object.yaml
and the finalizer example (reconciles a labelled subset of configmaps):

```sh
cargo run --example configmapgen_controller
cargo run --example secret_syncer
kubectl apply -f secret_syncer_configmap.yaml
kubectl delete -f secret_syncer_configmap.yaml
```
Expand All @@ -114,8 +114,6 @@ These examples watch resources as well as ive a store access point:
cargo run --example pod_reflector
# Watch nodes for applied events and current active nodes
cargo run --example node_reflector
# Watch namespace deployments for applied events and current deployments
cargo run --example deployment_reflector
# Watch namespaced secrets for applied events and print secret keys in a task
cargo run --example secret_reflector
# Watch namespaced configmaps for applied events and print store info in task
Expand Down
6 changes: 3 additions & 3 deletions examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher},
runtime::{reflector, reflector::Store, watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand Down Expand Up @@ -32,9 +32,9 @@ async fn main() -> anyhow::Result<()> {

spawn_periodic_reader(reader); // read from a reader in the background

let mut applied_events = try_flatten_applied(rf).boxed_local();
let mut applied_events = rf.watch_applies().boxed_local();
while let Some(event) = applied_events.try_next().await? {
info!("Applied {}", event.name())
info!("saw {}", event.name())
}
Ok(())
}
4 changes: 0 additions & 4 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ struct ConfigMapGeneratorSpec {

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
info!("working hard");
tokio::time::sleep(Duration::from_secs(2)).await;
info!("hard work is done!");

let client = ctx.get_ref().client.clone();

let mut contents = BTreeMap::new();
Expand Down
6 changes: 3 additions & 3 deletions examples/crd_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::*;

use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
runtime::{reflector, utils::try_flatten_applied, watcher},
runtime::{reflector, watcher, WatchStreamExt},
Client, CustomResource, CustomResourceExt,
};

Expand Down Expand Up @@ -48,9 +48,9 @@ async fn main() -> anyhow::Result<()> {
info!("Current crds: {:?}", crds);
}
});
let mut rfa = try_flatten_applied(rf).boxed();
let mut rfa = rf.watch_applies().boxed();
while let Some(event) = rfa.try_next().await? {
info!("Applied {}", event.name());
info!("saw {}", event.name());
}
Ok(())
}
42 changes: 0 additions & 42 deletions examples/deployment_reflector.rs

This file was deleted.

24 changes: 13 additions & 11 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::prelude::*;
use futures::{StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
discovery,
runtime::{utils::try_flatten_applied, watcher},
discovery::{self, Scope},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -22,17 +22,19 @@ async fn main() -> anyhow::Result<()> {
// Turn them into a GVK
let gvk = GroupVersionKind::gvk(&group, &version, &kind);
// Use API discovery to identify more information about the type (like its plural)
let (ar, _caps) = discovery::pinned_kind(&client, &gvk).await?;
let (ar, caps) = discovery::pinned_kind(&client, &gvk).await?;

// Use the discovered kind in an Api with the ApiResource as its DynamicType
// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
info!("Applied: {}", p.name());
Ok(())
})
.await?;
let mut items = watcher(api, ListParams::default()).watch_applies().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
info!("saw {}", p.name());
} else {
info!("saw {} in {}", p.name(), p.namespace().unwrap());
}
}
Ok(())
}
8 changes: 4 additions & 4 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
api::{Api, ListParams},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let ew = try_flatten_applied(watcher(events, lp));
let ew = watcher(events, lp).watch_applies();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand All @@ -27,8 +27,8 @@ async fn main() -> anyhow::Result<()> {
// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
info!(
"New Event: {} (via {} {})",
ev.message.unwrap(),
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
);
Expand Down
9 changes: 3 additions & 6 deletions examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use kube::{
core::GroupVersionKind,
discovery::{ApiCapabilities, ApiResource, Discovery, Scope},
runtime::{
utils::try_flatten_applied,
wait::{await_condition, conditions::is_deleted},
watcher,
watcher, WatchStreamExt,
},
Client,
};
Expand Down Expand Up @@ -118,10 +117,8 @@ impl App {
if let Some(n) = &self.name {
lp = lp.fields(&format!("metadata.name={}", n));
}
let w = watcher(api, lp);

// present a dumb table for it for now. maybe drop the whole watch. kubectl does not do it anymore.
let mut stream = try_flatten_applied(w).boxed();
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).watch_applies().boxed();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be pin_mut! rather than boxed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i am using it elsewhere. i just don't want to mix pin into the easy go-to example.

println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
Expand Down
8 changes: 4 additions & 4 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use k8s_openapi::api::{
};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -24,9 +24,9 @@ async fn main() -> anyhow::Result<()> {

// select on applied events from all watchers
let mut combo_stream = stream::select_all(vec![
try_flatten_applied(dep_watcher).map_ok(Watched::Deploy).boxed(),
try_flatten_applied(cm_watcher).map_ok(Watched::Config).boxed(),
try_flatten_applied(sec_watcher).map_ok(Watched::Secret).boxed(),
dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(),
cm_watcher.watch_applies().map_ok(Watched::Config).boxed(),
sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(),
]);
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
Expand Down
8 changes: 4 additions & 4 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, utils::try_flatten_applied, watcher},
runtime::{reflector, watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> {

let nodes: Api<Node> = Api::all(client.clone());
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label
.labels("kubernetes.io/arch=amd64") // filter instances by label
.timeout(10); // short watch timeout in this example

let store = reflector::store::Writer::<Node>::default();
Expand All @@ -31,9 +31,9 @@ async fn main() -> anyhow::Result<()> {
});

// Drain and log applied events from the reflector
let mut rfa = try_flatten_applied(rf).boxed();
let mut rfa = rf.watch_applies().boxed();
while let Some(event) = rfa.try_next().await? {
info!("Applied {}", event.name());
info!("saw {}", event.name());
}

Ok(())
Expand Down
16 changes: 6 additions & 10 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{
utils::{try_flatten_applied, StreamBackoff},
watcher,
},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -18,10 +15,10 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let obs = try_flatten_applied(StreamBackoff::new(
watcher(nodes, ListParams::default().labels("beta.kubernetes.io/os=linux")),
ExponentialBackoff::default(),
));
let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, lp)
.backoff(ExponentialBackoff::default())
.watch_applies();

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
Expand Down Expand Up @@ -57,8 +54,7 @@ async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result
warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?);
}
} else {
// Turn node_watcher=debug in log to see all
debug!("Healthy node: {}", name);
info!("Healthy node: {}", name);
}
Ok(())
}
7 changes: 4 additions & 3 deletions examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::prelude::*;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -13,9 +13,10 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let api = Api::<Pod>::default_namespaced(client);

try_flatten_applied(watcher(api, ListParams::default()))
watcher(api, ListParams::default())
.watch_applies()
.try_for_each(|p| async move {
debug!("Applied: {}", p.name());
info!("saw {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
warn!("{}", unready_reason);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/secret_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Secret;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher},
runtime::{reflector, reflector::Store, watcher, WatchStreamExt},
Client,
};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -60,9 +60,9 @@ async fn main() -> anyhow::Result<()> {
let rf = reflector(store, watcher(secrets, lp));
spawn_periodic_reader(reader); // read from a reader in the background

try_flatten_applied(rf)
rf.watch_applies()
.try_for_each(|s| async move {
info!("Applied: {}", s.name());
info!("saw: {}", s.name());
Ok(())
})
.await?;
Expand Down
15 changes: 6 additions & 9 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ enum Error {
type Result<T, E = Error> = std::result::Result<T, E>;

fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
Ok(format!(
"cm---{}",
cm.metadata.name.as_deref().ok_or(Error::NoName)?
))
let name = cm.metadata.name.as_deref().ok_or(Error::NoName)?;
Ok(format!("cmsyncer-{}", name))
}

async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Action> {
Expand Down Expand Up @@ -77,17 +75,16 @@ async fn cleanup(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Acti
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let kube = kube::Client::try_default().await?;
let all_cms = kube::Api::<ConfigMap>::all(kube.clone());
let client = kube::Client::try_default().await?;
Controller::new(
all_cms,
Api::<ConfigMap>::all(client.clone()),
ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
)
.run(
|cm, _| {
let ns = cm.meta().namespace.as_deref().ok_or(Error::NoNamespace).unwrap();
let cms: Api<ConfigMap> = Api::namespaced(kube.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(kube.clone(), ns);
let cms: Api<ConfigMap> = Api::namespaced(client.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(client.clone(), ns);
async move {
finalizer(
&cms,
Expand Down
Loading