diff --git a/Cargo.lock b/Cargo.lock index ad2a429..d6192ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ version = "3.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da95d038ede1a964ce99f49cbe27a7fb538d1da595e4b4f70b8c8f338d17bf16" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro-error", "proc-macro2", "quote", @@ -283,6 +283,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dyn-clone" version = "1.0.5" @@ -409,6 +415,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.21" @@ -486,6 +498,7 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures 0.1.31", "futures-channel", "futures-core", "futures-io", @@ -528,6 +541,15 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.0" @@ -650,9 +672,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" +checksum = "0f647032dfaa1f8b6dc29bd3edb7bbef4861b8b8007ebb118d6db284fd59f6ee" dependencies = [ "autocfg", "hashbrown", @@ -754,7 +776,7 @@ dependencies = [ "chrono", "dirs-next", "either", - "futures", + "futures 0.3.21", "http", "http-body", "hyper", @@ -819,7 +841,7 @@ dependencies = [ "ahash", "backoff", "derivative", - "futures", + "futures 0.3.21", "json-patch", "k8s-openapi", "kube-client", @@ -1465,6 +1487,28 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "snafu" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a7fe9b0669ef117c5cabc5549638528f36771f058ff977d7689deb517833a75" +dependencies = [ + "heck 0.3.3", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "socket2" version = "0.4.4" @@ -1491,10 +1535,14 @@ dependencies = [ "anyhow", "built", "clap", + "futures 0.3.21", "serde", + "serde_json", "serde_yaml", + "snafu", "stackable-commons-crd", "stackable-operator", + "strum", "tokio", ] @@ -1509,7 +1557,7 @@ dependencies = [ "const_format", "derivative", "either", - "futures", + "futures 0.3.21", "json-patch", "k8s-openapi", "kube", @@ -1549,7 +1597,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6878079b17446e4d3eba6192bb0a2950d5b14f0ed8424b852310e5a94345d0ef" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "rustversion", @@ -1864,6 +1912,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" + [[package]] name = "unicode-xid" version = "0.2.2" diff --git a/deploy/helm/commons-operator/templates/roles.yaml b/deploy/helm/commons-operator/templates/roles.yaml new file mode 100644 index 0000000..0fee2c9 --- /dev/null +++ b/deploy/helm/commons-operator/templates/roles.yaml @@ -0,0 +1,31 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ .Release.Name }}-clusterrole +rules: + - apiGroups: + - "" + resources: + - pods + - configmaps + - secrets + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - list + - watch + - patch # We need to add a label to the StatefulSet + - apiGroups: + - events.k8s.io + resources: + - events + verbs: + - create diff --git a/deploy/manifests/roles.yaml b/deploy/manifests/roles.yaml new file mode 100644 index 0000000..36b2e7d --- /dev/null +++ b/deploy/manifests/roles.yaml @@ -0,0 +1,31 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: commons-operator-clusterrole +rules: + - apiGroups: + - "" + resources: + - pods + - configmaps + - secrets + verbs: + - get + - list + - watch + - apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - list + - watch + - patch # We need to add a label to the StatefulSet + - apiGroups: + - events.k8s.io + resources: + - events + verbs: + - create diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index b8c841e..428d6e9 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -10,10 +10,14 @@ repository = "https://github.com/stackabletech/commons-operator" [dependencies] anyhow = "1.0" clap = "3.1" +futures = { version = "0.3", features = ["compat"] } serde = "1.0" +serde_json = "1.0" serde_yaml = "0.8" +snafu = "0.7" stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.15.0" } stackable-commons-crd = { path = "../crd" } +strum = { version = "0.24", features = ["derive"] } tokio = { version = "1.17", features = ["macros", "rt-multi-thread"] } [build-dependencies] diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 33f0ac0..c6e92d2 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,3 +1,5 @@ +mod restart_controller; + use stackable_operator::cli::Command; use clap::Parser; @@ -23,7 +25,21 @@ async fn main() -> anyhow::Result<()> { match opts.cmd { Command::Crd => println!("{}", serde_yaml::to_string(&AuthenticationClass::crd())?,), Command::Run(_) => { - todo!(); + stackable_operator::utils::print_startup_string( + built_info::PKG_DESCRIPTION, + built_info::PKG_VERSION, + built_info::GIT_VERSION, + built_info::TARGET, + built_info::BUILT_TIME_UTC, + built_info::RUSTC_VERSION, + ); + + let client = stackable_operator::client::create_client(Some( + "commons.stackable.tech".to_string(), + )) + .await?; + + restart_controller::start(&client).await? } } diff --git a/rust/operator-binary/src/restart_controller.rs b/rust/operator-binary/src/restart_controller.rs new file mode 100644 index 0000000..5bdc7f5 --- /dev/null +++ b/rust/operator-binary/src/restart_controller.rs @@ -0,0 +1,316 @@ +use std::collections::BTreeMap; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::time::Duration; + +use futures::{stream, Stream, StreamExt, TryStream}; +use serde_json::json; +use snafu::{ResultExt, Snafu}; +use stackable_operator::client::Client; +use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet; +use stackable_operator::k8s_openapi::api::core::v1::{ + ConfigMap, EnvFromSource, EnvVar, PodSpec, Secret, Volume, +}; +use stackable_operator::kube; +use stackable_operator::kube::api::{ListParams, Patch, PatchParams}; +use stackable_operator::kube::core::DynamicObject; +use stackable_operator::kube::runtime::controller::{ + trigger_self, trigger_with, Action, Context, ReconcileRequest, +}; +use stackable_operator::kube::runtime::reflector::{ObjectRef, Store}; +use stackable_operator::kube::runtime::utils::{try_flatten_applied, try_flatten_touched}; +use stackable_operator::kube::runtime::{applier, reflector, watcher}; +use stackable_operator::kube::{Resource, ResourceExt}; +use stackable_operator::logging::controller::{report_controller_reconciled, ReconcilerError}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +struct Ctx { + kube: kube::Client, + cms: Store, + cms_inited: Arc, + secrets: Store, + secrets_inited: Arc, +} + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +enum Error { + #[snafu(display("failed to patch object {}", obj_ref))] + PatchFailed { + source: kube::Error, + obj_ref: Box>, + }, + #[snafu(display("configmaps were not yet loaded"))] + ConfigMapsUninitialized, + #[snafu(display("secrets were not yet loaded"))] + SecretsUninitialized, +} + +impl ReconcilerError for Error { + fn category(&self) -> &'static str { + ErrorDiscriminants::from(self).into() + } + + fn secondary_object(&self) -> Option> { + match self { + Error::PatchFailed { obj_ref, .. } => Some(*obj_ref.clone()), + Error::ConfigMapsUninitialized => None, + Error::SecretsUninitialized => None, + } + } +} + +pub async fn start(client: &Client) -> anyhow::Result<()> { + let kube = kube::Client::try_default().await?; + let stses = kube::Api::::all(kube.clone()); + let cms = kube::Api::::all(kube.clone()); + let secrets = kube::Api::::all(kube.clone()); + let sts_store = reflector::store::Writer::new(()); + let cm_store = reflector::store::Writer::new(()); + let secret_store = reflector::store::Writer::new(()); + let cms_inited = Arc::new(AtomicBool::from(false)); + let secrets_inited = Arc::new(AtomicBool::from(false)); + + applier( + |sts, ctx| Box::pin(reconcile(sts, ctx)), + error_policy, + Context::new(Ctx { + kube, + cms: cm_store.as_reader(), + secrets: secret_store.as_reader(), + cms_inited: cms_inited.clone(), + secrets_inited: secrets_inited.clone(), + }), + sts_store.as_reader(), + stream::select( + stream::select( + trigger_all( + try_flatten_touched( + reflector(cm_store, watcher(cms, ListParams::default())).inspect(|_| { + cms_inited.store(true, std::sync::atomic::Ordering::SeqCst) + }), + ), + sts_store.as_reader(), + ), + trigger_all( + try_flatten_touched( + reflector(secret_store, watcher(secrets, ListParams::default())).inspect( + |_| secrets_inited.store(true, std::sync::atomic::Ordering::SeqCst), + ), + ), + sts_store.as_reader(), + ), + ), + trigger_self( + try_flatten_applied(reflector( + sts_store, + watcher( + stses, + ListParams::default().labels("restarter.stackable.tech/enabled=true"), + ), + )), + (), + ), + ), + ) + .for_each(|res| async move { + report_controller_reconciled(client, "commons.superset.stackable.tech", &res) + }) + .await; + + Ok(()) +} + +fn trigger_all( + stream: S, + store: Store, +) -> impl Stream, S::Error>> +where + S: TryStream, + K: Resource + Clone, +{ + trigger_with(stream, move |_| { + store + .state() + .into_iter() + .map(|obj| ObjectRef::from_obj(obj.as_ref())) + }) +} + +fn find_pod_refs<'a, K: Resource + 'a>( + pod_spec: &'a PodSpec, + volume_ref: impl Fn(&Volume) -> Option> + 'a, + env_var_ref: impl Fn(&EnvVar) -> Option> + 'a, + env_from_ref: impl Fn(&EnvFromSource) -> Option> + 'a, +) -> impl Iterator> + 'a { + let volume_refs = pod_spec.volumes.iter().flatten().flat_map(volume_ref); + let pod_containers = pod_spec + .containers + .iter() + .chain(pod_spec.init_containers.iter().flatten()); + let container_env_var_refs = pod_containers + .clone() + .flat_map(|container| &container.env) + .flatten() + .flat_map(env_var_ref); + let container_env_from_refs = pod_containers + .flat_map(|container| &container.env_from) + .flatten() + .flat_map(env_from_ref); + volume_refs + .chain(container_env_var_refs) + .chain(container_env_from_refs) +} + +async fn reconcile(sts: Arc, ctx: Context) -> Result { + if !ctx + .get_ref() + .cms_inited + .load(std::sync::atomic::Ordering::SeqCst) + { + return ConfigMapsUninitializedSnafu.fail(); + } + if !ctx + .get_ref() + .secrets_inited + .load(std::sync::atomic::Ordering::SeqCst) + { + return SecretsUninitializedSnafu.fail(); + } + + let ns = sts.metadata.namespace.as_deref().unwrap(); + let mut annotations = BTreeMap::::new(); + let pod_specs = sts + .spec + .iter() + .flat_map(|sts_spec| sts_spec.template.spec.as_ref()); + let cm_refs = pod_specs + .clone() + .flat_map(|pod_spec| { + find_pod_refs( + pod_spec, + |volume| { + Some(ObjectRef::::new( + volume.config_map.as_ref()?.name.as_deref()?, + )) + }, + |env_var| { + Some(ObjectRef::::new( + env_var + .value_from + .as_ref()? + .config_map_key_ref + .as_ref()? + .name + .as_deref()?, + )) + }, + |env_from| { + Some(ObjectRef::::new( + env_from.config_map_ref.as_ref()?.name.as_deref()?, + )) + }, + ) + }) + .map(|cm_ref| cm_ref.within(ns)); + annotations.extend( + cm_refs + .flat_map(|cm_ref| ctx.get_ref().cms.get(&cm_ref)) + .flat_map(|cm| { + Some(( + format!( + "configmap.restarter.stackable.tech/{}", + cm.metadata.name.as_ref()? + ), + format!( + "{}/{}", + cm.metadata.uid.as_ref()?, + cm.metadata.resource_version.as_ref()? + ), + )) + }), + ); + let secret_refs = pod_specs + .flat_map(|pod_spec| { + find_pod_refs( + pod_spec, + |volume| { + Some(ObjectRef::::new( + volume.secret.as_ref()?.secret_name.as_deref()?, + )) + }, + |env_var| { + Some(ObjectRef::::new( + env_var + .value_from + .as_ref()? + .secret_key_ref + .as_ref()? + .name + .as_deref()?, + )) + }, + |env_from| { + Some(ObjectRef::::new( + env_from.secret_ref.as_ref()?.name.as_deref()?, + )) + }, + ) + }) + .map(|secret_ref| secret_ref.within(ns)); + annotations.extend( + secret_refs + .flat_map(|secret_ref| ctx.get_ref().secrets.get(&secret_ref)) + .flat_map(|cm| { + Some(( + format!( + "secret.restarter.stackable.tech/{}", + cm.metadata.name.as_ref()? + ), + format!( + "{}/{}", + cm.metadata.uid.as_ref()?, + cm.metadata.resource_version.as_ref()? + ), + )) + }), + ); + let stses = kube::Api::::namespaced(ctx.get_ref().kube.clone(), ns); + stses + .patch( + &sts.name(), + &PatchParams { + force: true, + field_manager: Some("restarter.stackable.tech/statefulset".to_string()), + ..PatchParams::default() + }, + &Patch::Apply( + // Can't use typed API, see https://github.com/Arnavion/k8s-openapi/issues/112 + json!({ + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "metadata": { + "name": sts.metadata.name, + "namespace": sts.metadata.namespace, + "uid": sts.metadata.uid, + }, + "spec": { + "template": { + "metadata": { + "annotations": annotations, + }, + }, + }, + }), + ), + ) + .await + .context(PatchFailedSnafu { + obj_ref: ObjectRef::from_obj(sts.as_ref()).erase(), + })?; + Ok(Action::await_change()) +} + +fn error_policy(_error: &Error, _ctx: Context) -> Action { + Action::requeue(Duration::from_secs(5)) +}