Skip to content

Commit

Permalink
Add a WatchStreamExt trait for stream chaining (#899)
Browse files Browse the repository at this point in the history
* Add a WatchStreamExt trait for better event chaining

From discussion with @teozkr after #698 .
This tries to solve the problem in a more generic way using Stream
helpers.

It was not trivial, and I'm not convinced this is the easiest way to do
it, but every other path i tried failed.

Signed-off-by: clux <sszynrae@gmail.com>

* some slightly better docs and tests

Signed-off-by: clux <sszynrae@gmail.com>

* move eventflatten to own module to simplify ext

Signed-off-by: clux <sszynrae@gmail.com>

* update kubectl example to use WatchStreamExt

Signed-off-by: clux <sszynrae@gmail.com>

* convert watcher examples

Signed-off-by: clux <sszynrae@gmail.com>

* avoid re-importing futures::ready

Signed-off-by: clux <sszynrae@gmail.com>

* s/delete/emit_delete

Signed-off-by: clux <sszynrae@gmail.com>

* remove bad example comment

Signed-off-by: clux <sszynrae@gmail.com>

* use teo's rewrite queue

Signed-off-by: clux <sszynrae@gmail.com>

* update examples to all use WatchStreamExt

Signed-off-by: clux <sszynrae@gmail.com>

* add deprecation warning on try_flatten_* and remove internal use

Signed-off-by: clux <sszynrae@gmail.com>

* quick test of controllers with some minor tweaks

Signed-off-by: clux <sszynrae@gmail.com>
  • Loading branch information
clux committed May 11, 2022
1 parent 2d4a37d commit beeeb95
Show file tree
Hide file tree
Showing 20 changed files with 208 additions and 124 deletions.
4 changes: 0 additions & 4 deletions examples/Cargo.toml
Expand Up @@ -93,10 +93,6 @@ path = "crd_derive_no_schema.rs"
name = "crd_reflector"
path = "crd_reflector.rs"

[[example]]
name = "deployment_reflector"
path = "deployment_reflector.rs"

[[example]]
name = "dynamic_api"
path = "dynamic_api.rs"
Expand Down
4 changes: 1 addition & 3 deletions examples/README.md
Expand Up @@ -99,7 +99,7 @@ kubectl apply -f configmapgen_controller_object.yaml
and the finalizer example (reconciles a labelled subset of configmaps):

```sh
cargo run --example configmapgen_controller
cargo run --example secret_syncer
kubectl apply -f secret_syncer_configmap.yaml
kubectl delete -f secret_syncer_configmap.yaml
```
Expand All @@ -114,8 +114,6 @@ These examples watch resources as well as ive a store access point:
cargo run --example pod_reflector
# Watch nodes for applied events and current active nodes
cargo run --example node_reflector
# Watch namespace deployments for applied events and current deployments
cargo run --example deployment_reflector
# Watch namespaced secrets for applied events and print secret keys in a task
cargo run --example secret_reflector
# Watch namespaced configmaps for applied events and print store info in task
Expand Down
6 changes: 3 additions & 3 deletions examples/configmap_reflector.rs
Expand Up @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher},
runtime::{reflector, reflector::Store, watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand Down Expand Up @@ -32,9 +32,9 @@ async fn main() -> anyhow::Result<()> {

spawn_periodic_reader(reader); // read from a reader in the background

let mut applied_events = try_flatten_applied(rf).boxed_local();
let mut applied_events = rf.watch_applies().boxed_local();
while let Some(event) = applied_events.try_next().await? {
info!("Applied {}", event.name())
info!("saw {}", event.name())
}
Ok(())
}
4 changes: 0 additions & 4 deletions examples/configmapgen_controller.rs
Expand Up @@ -30,10 +30,6 @@ struct ConfigMapGeneratorSpec {

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
info!("working hard");
tokio::time::sleep(Duration::from_secs(2)).await;
info!("hard work is done!");

let client = ctx.get_ref().client.clone();

let mut contents = BTreeMap::new();
Expand Down
6 changes: 3 additions & 3 deletions examples/crd_reflector.rs
Expand Up @@ -4,7 +4,7 @@ use tracing::*;

use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
runtime::{reflector, utils::try_flatten_applied, watcher},
runtime::{reflector, watcher, WatchStreamExt},
Client, CustomResource, CustomResourceExt,
};

Expand Down Expand Up @@ -48,9 +48,9 @@ async fn main() -> anyhow::Result<()> {
info!("Current crds: {:?}", crds);
}
});
let mut rfa = try_flatten_applied(rf).boxed();
let mut rfa = rf.watch_applies().boxed();
while let Some(event) = rfa.try_next().await? {
info!("Applied {}", event.name());
info!("saw {}", event.name());
}
Ok(())
}
42 changes: 0 additions & 42 deletions examples/deployment_reflector.rs

This file was deleted.

24 changes: 13 additions & 11 deletions examples/dynamic_watcher.rs
@@ -1,8 +1,8 @@
use futures::prelude::*;
use futures::{StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
discovery,
runtime::{utils::try_flatten_applied, watcher},
discovery::{self, Scope},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -22,17 +22,19 @@ async fn main() -> anyhow::Result<()> {
// Turn them into a GVK
let gvk = GroupVersionKind::gvk(&group, &version, &kind);
// Use API discovery to identify more information about the type (like its plural)
let (ar, _caps) = discovery::pinned_kind(&client, &gvk).await?;
let (ar, caps) = discovery::pinned_kind(&client, &gvk).await?;

// Use the discovered kind in an Api with the ApiResource as its DynamicType
// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
info!("Applied: {}", p.name());
Ok(())
})
.await?;
let mut items = watcher(api, ListParams::default()).watch_applies().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
info!("saw {}", p.name());
} else {
info!("saw {} in {}", p.name(), p.namespace().unwrap());
}
}
Ok(())
}
8 changes: 4 additions & 4 deletions examples/event_watcher.rs
Expand Up @@ -2,7 +2,7 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
api::{Api, ListParams},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let ew = try_flatten_applied(watcher(events, lp));
let ew = watcher(events, lp).watch_applies();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand All @@ -27,8 +27,8 @@ async fn main() -> anyhow::Result<()> {
// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
info!(
"New Event: {} (via {} {})",
ev.message.unwrap(),
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
);
Expand Down
9 changes: 3 additions & 6 deletions examples/kubectl.rs
Expand Up @@ -12,9 +12,8 @@ use kube::{
core::GroupVersionKind,
discovery::{ApiCapabilities, ApiResource, Discovery, Scope},
runtime::{
utils::try_flatten_applied,
wait::{await_condition, conditions::is_deleted},
watcher,
watcher, WatchStreamExt,
},
Client,
};
Expand Down Expand Up @@ -118,10 +117,8 @@ impl App {
if let Some(n) = &self.name {
lp = lp.fields(&format!("metadata.name={}", n));
}
let w = watcher(api, lp);

// present a dumb table for it for now. maybe drop the whole watch. kubectl does not do it anymore.
let mut stream = try_flatten_applied(w).boxed();
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).watch_applies().boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
Expand Down
8 changes: 4 additions & 4 deletions examples/multi_watcher.rs
Expand Up @@ -5,7 +5,7 @@ use k8s_openapi::api::{
};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -24,9 +24,9 @@ async fn main() -> anyhow::Result<()> {

// select on applied events from all watchers
let mut combo_stream = stream::select_all(vec![
try_flatten_applied(dep_watcher).map_ok(Watched::Deploy).boxed(),
try_flatten_applied(cm_watcher).map_ok(Watched::Config).boxed(),
try_flatten_applied(sec_watcher).map_ok(Watched::Secret).boxed(),
dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(),
cm_watcher.watch_applies().map_ok(Watched::Config).boxed(),
sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(),
]);
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
Expand Down
8 changes: 4 additions & 4 deletions examples/node_reflector.rs
Expand Up @@ -2,7 +2,7 @@ use futures::{StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, utils::try_flatten_applied, watcher},
runtime::{reflector, watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> {

let nodes: Api<Node> = Api::all(client.clone());
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge") // filter instances by label
.labels("kubernetes.io/arch=amd64") // filter instances by label
.timeout(10); // short watch timeout in this example

let store = reflector::store::Writer::<Node>::default();
Expand All @@ -31,9 +31,9 @@ async fn main() -> anyhow::Result<()> {
});

// Drain and log applied events from the reflector
let mut rfa = try_flatten_applied(rf).boxed();
let mut rfa = rf.watch_applies().boxed();
while let Some(event) = rfa.try_next().await? {
info!("Applied {}", event.name());
info!("saw {}", event.name());
}

Ok(())
Expand Down
16 changes: 6 additions & 10 deletions examples/node_watcher.rs
Expand Up @@ -3,10 +3,7 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{
utils::{try_flatten_applied, StreamBackoff},
watcher,
},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -18,10 +15,10 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let obs = try_flatten_applied(StreamBackoff::new(
watcher(nodes, ListParams::default().labels("beta.kubernetes.io/os=linux")),
ExponentialBackoff::default(),
));
let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, lp)
.backoff(ExponentialBackoff::default())
.watch_applies();

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
Expand Down Expand Up @@ -57,8 +54,7 @@ async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result
warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?);
}
} else {
// Turn node_watcher=debug in log to see all
debug!("Healthy node: {}", name);
info!("Healthy node: {}", name);
}
Ok(())
}
7 changes: 4 additions & 3 deletions examples/pod_watcher.rs
Expand Up @@ -2,7 +2,7 @@ use futures::prelude::*;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -13,9 +13,10 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let api = Api::<Pod>::default_namespaced(client);

try_flatten_applied(watcher(api, ListParams::default()))
watcher(api, ListParams::default())
.watch_applies()
.try_for_each(|p| async move {
debug!("Applied: {}", p.name());
info!("saw {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
warn!("{}", unready_reason);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/secret_reflector.rs
Expand Up @@ -2,7 +2,7 @@ use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Secret;
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{reflector, reflector::Store, utils::try_flatten_applied, watcher},
runtime::{reflector, reflector::Store, watcher, WatchStreamExt},
Client,
};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -60,9 +60,9 @@ async fn main() -> anyhow::Result<()> {
let rf = reflector(store, watcher(secrets, lp));
spawn_periodic_reader(reader); // read from a reader in the background

try_flatten_applied(rf)
rf.watch_applies()
.try_for_each(|s| async move {
info!("Applied: {}", s.name());
info!("saw: {}", s.name());
Ok(())
})
.await?;
Expand Down
15 changes: 6 additions & 9 deletions examples/secret_syncer.rs
Expand Up @@ -31,10 +31,8 @@ enum Error {
type Result<T, E = Error> = std::result::Result<T, E>;

fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
Ok(format!(
"cm---{}",
cm.metadata.name.as_deref().ok_or(Error::NoName)?
))
let name = cm.metadata.name.as_deref().ok_or(Error::NoName)?;
Ok(format!("cmsyncer-{}", name))
}

async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Action> {
Expand Down Expand Up @@ -77,17 +75,16 @@ async fn cleanup(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Acti
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let kube = kube::Client::try_default().await?;
let all_cms = kube::Api::<ConfigMap>::all(kube.clone());
let client = kube::Client::try_default().await?;
Controller::new(
all_cms,
Api::<ConfigMap>::all(client.clone()),
ListParams::default().labels("configmap-secret-syncer.nullable.se/sync=true"),
)
.run(
|cm, _| {
let ns = cm.meta().namespace.as_deref().ok_or(Error::NoNamespace).unwrap();
let cms: Api<ConfigMap> = Api::namespaced(kube.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(kube.clone(), ns);
let cms: Api<ConfigMap> = Api::namespaced(client.clone(), ns);
let secrets: Api<Secret> = Api::namespaced(client.clone(), ns);
async move {
finalizer(
&cms,
Expand Down

0 comments on commit beeeb95

Please sign in to comment.