From 58135e24b8138a32ce42f09930cf3708a54fc9c6 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 7 Jun 2022 11:04:15 +0200 Subject: [PATCH] orchestrator: introduce service watcher This commit introduces the `watch_services` method to the `NamespacedOrchestrator` trait, and corresponding implementations to the concrete orchestrators. `watch_services` supports continuously listening for status changes of services in the respective namespace. The `ServiceStatus` enum is kept simple in the initial implementations, services can only be ready or not ready. This will probably become more fine-grained in the future. The kubernetes orchestrator regards a service as ready if its "Ready" condition is fulfilled, and not ready otherwise. The process orchestrator simply considers every running process as ready. --- Cargo.lock | 67 +++++++++++++++++++++++++- src/orchestrator-kubernetes/Cargo.toml | 3 +- src/orchestrator-kubernetes/src/lib.rs | 57 ++++++++++++++++++++-- src/orchestrator-process/Cargo.toml | 2 + src/orchestrator-process/src/lib.rs | 41 ++++++++++++++-- src/orchestrator-tracing/Cargo.toml | 1 + src/orchestrator-tracing/src/lib.rs | 7 ++- src/orchestrator/Cargo.toml | 1 + src/orchestrator/src/lib.rs | 23 +++++++++ 9 files changed, 192 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a47cfb30236a..6bd5c15c5896 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -668,6 +668,17 @@ dependencies = [ "mime", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom", + "instant", + "rand", +] + [[package]] name = "backtrace" version = "0.3.65" @@ -2410,6 +2421,17 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" +[[package]] +name = "json-patch" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f995a3c8f2bc3dd52a18a583e90f9ec109c047fa1603a853e46bcda14d2e279d" +dependencies = [ + "serde", + "serde_json", + "treediff", +] + [[package]] name = "jsonpath_lib" version = "0.3.0" @@ -2474,6 +2496,7 @@ dependencies = [ "k8s-openapi", "kube-client", "kube-core", + "kube-runtime", ] [[package]] @@ -2522,6 +2545,7 @@ dependencies = [ "chrono", "form_urlencoded", "http", + "json-patch", "k8s-openapi", "once_cell", "serde", @@ -2529,6 +2553,30 @@ dependencies = [ "thiserror", ] +[[package]] +name = "kube-runtime" +version = "0.73.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6e9e9da456f0101b77f864a9da44866b9891ad4740db508b4b269343ebeb01d" +dependencies = [ + "ahash", + "backoff", + "derivative", + "futures", + "json-patch", + "k8s-openapi", + "kube-client", + "parking_lot", + "pin-project", + "serde", + "serde_json", + "smallvec", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -3452,6 +3500,7 @@ dependencies = [ "async-trait", "bytesize", "derivative", + "futures-core", "serde", ] @@ -3462,6 +3511,7 @@ dependencies = [ "anyhow", "async-trait", "clap", + "futures", "k8s-openapi", "kube", "mz-orchestrator", @@ -3474,7 +3524,9 @@ name = "mz-orchestrator-process" version = "0.0.0" dependencies = [ "anyhow", + "async-stream", "async-trait", + "futures", "itertools", "mz-orchestrator", "mz-ore", @@ -3494,6 +3546,7 @@ dependencies = [ "anyhow", "async-trait", "clap", + "futures-core", "http", "mz-orchestrator", "mz-ore", @@ -5642,9 +5695,9 @@ checksum = "fa8f3741c7372e75519bd9346068370c9cdaabcc1f9599cbcf2a2719352286b7" [[package]] name = "slab" -version = "0.4.2" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" @@ -6235,6 +6288,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] @@ -6433,6 +6487,15 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "treediff" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "761e8d5ad7ce14bb82b7e61ccc0ca961005a275a060b9644a2431aa11553c2ff" +dependencies = [ + "serde_json", +] + [[package]] name = "treeline" version = "0.1.0" diff --git a/src/orchestrator-kubernetes/Cargo.toml b/src/orchestrator-kubernetes/Cargo.toml index 7c344e32695e..6a120dd9f067 100644 --- a/src/orchestrator-kubernetes/Cargo.toml +++ b/src/orchestrator-kubernetes/Cargo.toml @@ -10,8 +10,9 @@ publish = false anyhow = "1.0.57" async-trait = "0.1.56" clap = { version = "3.1.18", features = ["derive"] } +futures = "0.3.21" mz-orchestrator = { path = "../orchestrator" } k8s-openapi = { version = "0.15.0", features = ["v1_22"] } -kube = { version = "0.73.1", features = ["ws"] } +kube = { version = "0.73.1", features = ["runtime", "ws"] } serde_json = "1.0.81" sha2 = "0.10.2" diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 9b2cce57b112..38d26d2f852f 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -12,9 +12,10 @@ use std::fmt; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; -use anyhow::bail; +use anyhow::{anyhow, bail}; use async_trait::async_trait; use clap::ArgEnum; +use futures::stream::{BoxStream, StreamExt}; use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec}; use k8s_openapi::api::core::v1::{ Container, ContainerPort, Pod, PodSpec, PodTemplateSpec, ResourceRequirements, @@ -26,11 +27,13 @@ use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams}; use kube::client::Client; use kube::config::{Config, KubeConfigOptions}; use kube::error::Error; +use kube::runtime::{watcher, WatchStreamExt}; use kube::ResourceExt; use sha2::{Digest, Sha256}; use mz_orchestrator::{ - NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, + NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent, + ServiceStatus, }; const FIELD_MANAGER: &str = "materialized"; @@ -148,6 +151,18 @@ impl fmt::Debug for NamespacedKubernetesOrchestrator { } } +impl NamespacedKubernetesOrchestrator { + /// Return a `ListParams` instance that limits results to the namespace + /// assigned to this orchestrator. + fn list_params(&self) -> ListParams { + let ns_selector = format!( + "materialized.materialize.cloud/namespace={}", + self.namespace + ); + ListParams::default().labels(&ns_selector) + } +} + #[async_trait] impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { async fn ensure_service( @@ -402,7 +417,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { /// Lists the identifiers of all known services. async fn list_services(&self) -> Result, anyhow::Error> { - let stateful_sets = self.stateful_set_api.list(&ListParams::default()).await?; + let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?; let name_prefix = format!("{}-", self.namespace); Ok(stateful_sets .into_iter() @@ -415,6 +430,42 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator { }) .collect()) } + + fn watch_services(&self) -> BoxStream<'static, Result> { + fn into_service_event(pod: Pod) -> Result { + let process_id = pod.name().split('-').last().unwrap().parse()?; + let service_id_label = "materialized.materialize.cloud/service-id"; + let service_id = pod + .labels() + .get(service_id_label) + .ok_or_else(|| anyhow!("missing label: {service_id_label}"))? + .clone(); + + let pod_ready = pod + .status + .and_then(|status| status.conditions) + .and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready")) + .map(|c| c.status == "True") + .unwrap_or(false); + + let status = if pod_ready { + ServiceStatus::Ready + } else { + ServiceStatus::NotReady + }; + + Ok(ServiceEvent { + service_id, + process_id, + status, + }) + } + + let stream = watcher(self.pod_api.clone(), self.list_params()) + .touched_objects() + .map(|object| object.map_err(Into::into).and_then(into_service_event)); + Box::pin(stream) + } } #[derive(Debug, Clone)] diff --git a/src/orchestrator-process/Cargo.toml b/src/orchestrator-process/Cargo.toml index f52dad471030..2d25ff1600cf 100644 --- a/src/orchestrator-process/Cargo.toml +++ b/src/orchestrator-process/Cargo.toml @@ -8,7 +8,9 @@ publish = false [dependencies] anyhow = "1.0.57" +async-stream = "0.3.3" async-trait = "0.1.56" +futures = "0.3.21" itertools = "0.10.3" mz-orchestrator = { path = "../orchestrator" } mz-ore = { path = "../ore" } diff --git a/src/orchestrator-process/src/lib.rs b/src/orchestrator-process/src/lib.rs index dc70c2231260..9d9531fefa3e 100644 --- a/src/orchestrator-process/src/lib.rs +++ b/src/orchestrator-process/src/lib.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex}; use anyhow::anyhow; use async_trait::async_trait; +use futures::stream::BoxStream; use itertools::Itertools; use scopeguard::defer; use sysinfo::{ProcessExt, ProcessStatus, SystemExt}; @@ -27,7 +28,8 @@ use tokio::time::{self, Duration}; use tracing::{error, info}; use mz_orchestrator::{ - NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, + NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent, + ServiceStatus, }; use mz_ore::id_gen::PortAllocator; use mz_pid_file::PidFile; @@ -103,7 +105,7 @@ impl Orchestrator for ProcessOrchestrator { image_dir: self.image_dir.clone(), port_allocator: Arc::clone(&self.port_allocator), suppress_output: self.suppress_output, - supervisors: Mutex::new(HashMap::new()), + supervisors: Arc::new(Mutex::new(HashMap::new())), data_dir: self.data_dir.clone(), command_wrapper: self.command_wrapper.clone(), }) @@ -117,7 +119,7 @@ struct NamespacedProcessOrchestrator { image_dir: PathBuf, port_allocator: Arc, suppress_output: bool, - supervisors: Mutex>>, + supervisors: Arc>>>, data_dir: PathBuf, command_wrapper: Vec, } @@ -255,6 +257,39 @@ impl NamespacedOrchestrator for NamespacedProcessOrchestrator { let supervisors = self.supervisors.lock().expect("lock poisoned"); Ok(supervisors.keys().cloned().collect()) } + + fn watch_services(&self) -> BoxStream<'static, Result> { + // The process orchestrator currently doesn't provide good support for + // tracking service status, so we punt and always return an "unknown" + // status instead. We can still report the existence of individual + // processes though. + + let supervisors = Arc::clone(&self.supervisors); + let stream = async_stream::stream! { + let mut events = Vec::new(); + loop { + { + let supervisors = supervisors.lock().expect("lock poisoned"); + for (service, processes) in supervisors.iter() { + for process_idx in 0..processes.len() { + events.push(ServiceEvent { + service_id: service.to_string(), + process_id: process_idx as i64, + status: ServiceStatus::Unknown, + }); + } + } + } + + for event in events.drain(..) { + yield Ok(event); + } + + time::sleep(Duration::from_secs(5)).await; + } + }; + Box::pin(stream) + } } async fn supervise( diff --git a/src/orchestrator-tracing/Cargo.toml b/src/orchestrator-tracing/Cargo.toml index 9dbbd846dab1..5fc3f8f75e7e 100644 --- a/src/orchestrator-tracing/Cargo.toml +++ b/src/orchestrator-tracing/Cargo.toml @@ -10,6 +10,7 @@ publish = false anyhow = "1.0.57" async-trait = "0.1.56" clap = { version = "3.1.18", features = ["env", "derive"] } +futures-core = "0.3.21" http = "0.2.7" mz-orchestrator = { path = "../orchestrator" } mz-ore = { path = "../ore", features = ["tracing_"] } diff --git a/src/orchestrator-tracing/src/lib.rs b/src/orchestrator-tracing/src/lib.rs index b142d74d7150..66c4f5662670 100644 --- a/src/orchestrator-tracing/src/lib.rs +++ b/src/orchestrator-tracing/src/lib.rs @@ -20,6 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use clap::{FromArgMatches, IntoApp}; +use futures_core::stream::BoxStream; use http::header::{HeaderName, HeaderValue}; use opentelemetry::sdk::resource::Resource; use opentelemetry::KeyValue; @@ -28,7 +29,7 @@ use tracing_subscriber::filter::Targets; #[cfg(feature = "tokio-console")] use mz_orchestrator::ServicePort; use mz_orchestrator::{ - NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, + NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent, }; use mz_ore::cli::KeyValueArg; #[cfg(feature = "tokio-console")] @@ -327,6 +328,10 @@ impl NamespacedOrchestrator for NamespacedTracingOrchestrator { async fn list_services(&self) -> Result, anyhow::Error> { self.inner.list_services().await } + + fn watch_services(&self) -> BoxStream<'static, Result> { + self.inner.watch_services() + } } /// Wraps [`Targets`] to provide a [`Display`](fmt::Display) implementation. diff --git a/src/orchestrator/Cargo.toml b/src/orchestrator/Cargo.toml index a38d8fcc7425..a9c080d8bf07 100644 --- a/src/orchestrator/Cargo.toml +++ b/src/orchestrator/Cargo.toml @@ -11,4 +11,5 @@ anyhow = "1.0.57" async-trait = "0.1.56" bytesize = "1.1.0" derivative = "2.2.0" +futures-core = "0.3.21" serde = "1.0" diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index 8f84af39fa5d..afd61607b144 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytesize::ByteSize; use derivative::Derivative; +use futures_core::stream::BoxStream; use serde::de::Unexpected; use serde::{Deserialize, Deserializer, Serialize}; @@ -57,6 +58,28 @@ pub trait NamespacedOrchestrator: fmt::Debug + Send + Sync { /// Lists the identifiers of all known services. async fn list_services(&self) -> Result, anyhow::Error>; + + /// Watch for status changes of all known services. + fn watch_services(&self) -> BoxStream<'static, Result>; +} + +/// An event describing a status change of an orchestrated service. +#[derive(Debug, Clone, Serialize)] +pub struct ServiceEvent { + pub service_id: String, + pub process_id: i64, + pub status: ServiceStatus, +} + +/// Describes the status of an orchestrated service. +#[derive(Debug, Clone, Copy, Serialize)] +pub enum ServiceStatus { + /// Service is ready to accept requests. + Ready, + /// Service is not ready to accept requests. + NotReady, + /// Service status is unknown. + Unknown, } /// Describes a running service managed by an `Orchestrator`.