Skip to content

Commit

Permalink
orchestrator: introduce service watcher
Browse files Browse the repository at this point in the history
This commit introduces the `watch_services` method to the
`NamespacedOrchestrator` trait, and corresponding implementations to the
concrete orchestrators. `watch_services` supports continuously listening
for status changes of services in the respective namespace.

The `ServiceStatus` enum is kept simple in the initial implementations,
services can only be ready or not ready. This will probably become more
fine-grained in the future.

The kubernetes orchestrator regards a service as ready if its "Ready"
condition is fulfilled, and not ready otherwise. The process
orchestrator simply considers every running process as ready.
  • Loading branch information
teskje committed Jun 10, 2022
1 parent 892e5fa commit 58135e2
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 10 deletions.
67 changes: 65 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/orchestrator-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ publish = false
anyhow = "1.0.57"
async-trait = "0.1.56"
clap = { version = "3.1.18", features = ["derive"] }
futures = "0.3.21"
mz-orchestrator = { path = "../orchestrator" }
k8s-openapi = { version = "0.15.0", features = ["v1_22"] }
kube = { version = "0.73.1", features = ["ws"] }
kube = { version = "0.73.1", features = ["runtime", "ws"] }
serde_json = "1.0.81"
sha2 = "0.10.2"
57 changes: 54 additions & 3 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ use std::fmt;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;

use anyhow::bail;
use anyhow::{anyhow, bail};
use async_trait::async_trait;
use clap::ArgEnum;
use futures::stream::{BoxStream, StreamExt};
use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, Pod, PodSpec, PodTemplateSpec, ResourceRequirements,
Expand All @@ -26,11 +27,13 @@ use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
use kube::client::Client;
use kube::config::{Config, KubeConfigOptions};
use kube::error::Error;
use kube::runtime::{watcher, WatchStreamExt};
use kube::ResourceExt;
use sha2::{Digest, Sha256};

use mz_orchestrator::{
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig,
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
ServiceStatus,
};

const FIELD_MANAGER: &str = "materialized";
Expand Down Expand Up @@ -148,6 +151,18 @@ impl fmt::Debug for NamespacedKubernetesOrchestrator {
}
}

impl NamespacedKubernetesOrchestrator {
/// Return a `ListParams` instance that limits results to the namespace
/// assigned to this orchestrator.
fn list_params(&self) -> ListParams {
let ns_selector = format!(
"materialized.materialize.cloud/namespace={}",
self.namespace
);
ListParams::default().labels(&ns_selector)
}
}

#[async_trait]
impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
async fn ensure_service(
Expand Down Expand Up @@ -402,7 +417,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Lists the identifiers of all known services.
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
let stateful_sets = self.stateful_set_api.list(&ListParams::default()).await?;
let stateful_sets = self.stateful_set_api.list(&self.list_params()).await?;
let name_prefix = format!("{}-", self.namespace);
Ok(stateful_sets
.into_iter()
Expand All @@ -415,6 +430,42 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {
})
.collect())
}

fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
fn into_service_event(pod: Pod) -> Result<ServiceEvent, anyhow::Error> {
let process_id = pod.name().split('-').last().unwrap().parse()?;
let service_id_label = "materialized.materialize.cloud/service-id";
let service_id = pod
.labels()
.get(service_id_label)
.ok_or_else(|| anyhow!("missing label: {service_id_label}"))?
.clone();

let pod_ready = pod
.status
.and_then(|status| status.conditions)
.and_then(|conditions| conditions.into_iter().find(|c| c.type_ == "Ready"))
.map(|c| c.status == "True")
.unwrap_or(false);

let status = if pod_ready {
ServiceStatus::Ready
} else {
ServiceStatus::NotReady
};

Ok(ServiceEvent {
service_id,
process_id,
status,
})
}

let stream = watcher(self.pod_api.clone(), self.list_params())
.touched_objects()
.map(|object| object.map_err(Into::into).and_then(into_service_event));
Box::pin(stream)
}
}

#[derive(Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions src/orchestrator-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ publish = false

[dependencies]
anyhow = "1.0.57"
async-stream = "0.3.3"
async-trait = "0.1.56"
futures = "0.3.21"
itertools = "0.10.3"
mz-orchestrator = { path = "../orchestrator" }
mz-ore = { path = "../ore" }
Expand Down
41 changes: 38 additions & 3 deletions src/orchestrator-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex};

use anyhow::anyhow;
use async_trait::async_trait;
use futures::stream::BoxStream;
use itertools::Itertools;
use scopeguard::defer;
use sysinfo::{ProcessExt, ProcessStatus, SystemExt};
Expand All @@ -27,7 +28,8 @@ use tokio::time::{self, Duration};
use tracing::{error, info};

use mz_orchestrator::{
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig,
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
ServiceStatus,
};
use mz_ore::id_gen::PortAllocator;
use mz_pid_file::PidFile;
Expand Down Expand Up @@ -103,7 +105,7 @@ impl Orchestrator for ProcessOrchestrator {
image_dir: self.image_dir.clone(),
port_allocator: Arc::clone(&self.port_allocator),
suppress_output: self.suppress_output,
supervisors: Mutex::new(HashMap::new()),
supervisors: Arc::new(Mutex::new(HashMap::new())),
data_dir: self.data_dir.clone(),
command_wrapper: self.command_wrapper.clone(),
})
Expand All @@ -117,7 +119,7 @@ struct NamespacedProcessOrchestrator {
image_dir: PathBuf,
port_allocator: Arc<PortAllocator>,
suppress_output: bool,
supervisors: Mutex<HashMap<String, Vec<AbortOnDrop>>>,
supervisors: Arc<Mutex<HashMap<String, Vec<AbortOnDrop>>>>,
data_dir: PathBuf,
command_wrapper: Vec<String>,
}
Expand Down Expand Up @@ -255,6 +257,39 @@ impl NamespacedOrchestrator for NamespacedProcessOrchestrator {
let supervisors = self.supervisors.lock().expect("lock poisoned");
Ok(supervisors.keys().cloned().collect())
}

fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
// The process orchestrator currently doesn't provide good support for
// tracking service status, so we punt and always return an "unknown"
// status instead. We can still report the existence of individual
// processes though.

let supervisors = Arc::clone(&self.supervisors);
let stream = async_stream::stream! {
let mut events = Vec::new();
loop {
{
let supervisors = supervisors.lock().expect("lock poisoned");
for (service, processes) in supervisors.iter() {
for process_idx in 0..processes.len() {
events.push(ServiceEvent {
service_id: service.to_string(),
process_id: process_idx as i64,
status: ServiceStatus::Unknown,
});
}
}
}

for event in events.drain(..) {
yield Ok(event);
}

time::sleep(Duration::from_secs(5)).await;
}
};
Box::pin(stream)
}
}

async fn supervise(
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
anyhow = "1.0.57"
async-trait = "0.1.56"
clap = { version = "3.1.18", features = ["env", "derive"] }
futures-core = "0.3.21"
http = "0.2.7"
mz-orchestrator = { path = "../orchestrator" }
mz-ore = { path = "../ore", features = ["tracing_"] }
Expand Down
7 changes: 6 additions & 1 deletion src/orchestrator-tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Duration;

use async_trait::async_trait;
use clap::{FromArgMatches, IntoApp};
use futures_core::stream::BoxStream;
use http::header::{HeaderName, HeaderValue};
use opentelemetry::sdk::resource::Resource;
use opentelemetry::KeyValue;
Expand All @@ -28,7 +29,7 @@ use tracing_subscriber::filter::Targets;
#[cfg(feature = "tokio-console")]
use mz_orchestrator::ServicePort;
use mz_orchestrator::{
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig,
NamespacedOrchestrator, Orchestrator, Service, ServiceAssignments, ServiceConfig, ServiceEvent,
};
use mz_ore::cli::KeyValueArg;
#[cfg(feature = "tokio-console")]
Expand Down Expand Up @@ -327,6 +328,10 @@ impl NamespacedOrchestrator for NamespacedTracingOrchestrator {
async fn list_services(&self) -> Result<Vec<String>, anyhow::Error> {
self.inner.list_services().await
}

fn watch_services(&self) -> BoxStream<'static, Result<ServiceEvent, anyhow::Error>> {
self.inner.watch_services()
}
}

/// Wraps [`Targets`] to provide a [`Display`](fmt::Display) implementation.
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ anyhow = "1.0.57"
async-trait = "0.1.56"
bytesize = "1.1.0"
derivative = "2.2.0"
futures-core = "0.3.21"
serde = "1.0"
Loading

0 comments on commit 58135e2

Please sign in to comment.