Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable updating host labels via the control interface #780

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/control-interface/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,22 @@ pub fn clear_config(
)
}

pub fn put_label(topic_prefix: &Option<String>, lattice_prefix: &str, host_id: &str) -> String {
format!(
"{}.labels.{}.put",
prefix(topic_prefix, lattice_prefix),
host_id
)
}

pub fn delete_label(topic_prefix: &Option<String>, lattice_prefix: &str, host_id: &str) -> String {
format!(
"{}.labels.{}.del",
prefix(topic_prefix, lattice_prefix),
host_id
)
}

pub mod commands {
use super::prefix;

Expand Down
41 changes: 41 additions & 0 deletions crates/control-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,47 @@ impl Client {
}
}

/// Put a new (or update an existing) label on the given host.
///
/// # Errors
///
/// Will return an error if there is a communication problem with the host
pub async fn put_label(
&self,
host_id: &str,
key: &str,
value: &str,
) -> Result<CtlOperationAck> {
let subject = broker::put_label(&self.topic_prefix, &self.lattice_prefix, host_id);
debug!(%subject, "putting label");
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: value.to_string(),
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive put label acknowledgement: {e}").into()),
}
}

/// Removes a label from the given host.
///
/// # Errors
///
/// Will return an error if there is a communication problem with the host
pub async fn delete_label(&self, host_id: &str, key: &str) -> Result<CtlOperationAck> {
let subject = broker::delete_label(&self.topic_prefix, &self.lattice_prefix, host_id);
debug!(%subject, "removing label");
vados-cosmonic marked this conversation as resolved.
Show resolved Hide resolved
let bytes = json_serialize(HostLabel {
key: key.to_string(),
value: String::new(), // value isn't parsed by the host
})?;
match self.request_timeout(subject, bytes, self.timeout).await {
Ok(msg) => Ok(json_deserialize(&msg.payload)?),
Err(e) => Err(format!("Did not receive remove label acknowledgement: {e}").into()),
vados-cosmonic marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Issue a command to a host instructing that it replace an existing actor (indicated by its
/// public key) with a new actor indicated by an OCI image reference. The host will acknowledge
/// this request as soon as it verifies that the target actor is running. This acknowledgement
Expand Down
6 changes: 6 additions & 0 deletions crates/control-interface/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,3 +418,9 @@ pub struct LinkDefinition {
pub contract_id: String,
pub values: LinkSettings,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct HostLabel {
pub key: String,
pub value: String,
}
133 changes: 95 additions & 38 deletions crates/host/src/wasmbus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ use ulid::Ulid;
use uuid::Uuid;
use wascap::{jwt, prelude::ClaimsBuilder};
use wasmcloud_control_interface::{
ActorAuctionAck, ActorAuctionRequest, ActorDescription, HostInventory, LinkDefinition,
LinkDefinitionList, ProviderAuctionAck, ProviderAuctionRequest, ProviderDescription,
RegistryCredential, RegistryCredentialMap, RemoveLinkDefinitionRequest, ScaleActorCommand,
StartProviderCommand, StopActorCommand, StopHostCommand, StopProviderCommand,
UpdateActorCommand,
ActorAuctionAck, ActorAuctionRequest, ActorDescription, HostInventory, HostLabel,
LinkDefinition, LinkDefinitionList, ProviderAuctionAck, ProviderAuctionRequest,
ProviderDescription, RegistryCredential, RegistryCredentialMap, RemoveLinkDefinitionRequest,
ScaleActorCommand, StartProviderCommand, StopActorCommand, StopHostCommand,
StopProviderCommand, UpdateActorCommand,
};
use wasmcloud_core::chunking::{ChunkEndpoint, CHUNK_RPC_EXTRA_TIME, CHUNK_THRESHOLD_BYTES};
use wasmcloud_core::{
Expand All @@ -78,6 +78,7 @@ struct Queue {
commands: async_nats::Subscriber,
pings: async_nats::Subscriber,
inventory: async_nats::Subscriber,
labels: async_nats::Subscriber,
links: async_nats::Subscriber,
queries: async_nats::Subscriber,
registries: async_nats::Subscriber,
Expand All @@ -95,6 +96,11 @@ impl Stream for Queue {
Poll::Ready(None) => {}
Poll::Pending => pending = true,
}
match Pin::new(&mut self.labels).poll_next(cx) {
Poll::Ready(Some(msg)) => return Poll::Ready(Some(msg)),
Poll::Ready(None) => {}
Poll::Pending => pending = true,
}
match Pin::new(&mut self.links).poll_next(cx) {
Poll::Ready(Some(msg)) => return Poll::Ready(Some(msg)),
Poll::Ready(None) => {}
Expand Down Expand Up @@ -194,36 +200,50 @@ impl Queue {
host_key: &KeyPair,
) -> anyhow::Result<Self> {
let host_id = host_key.public_key();
let (registries, pings, links, queries, auction, commands, inventory, config, config_get) =
try_join!(
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.registries.put",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.ping.hosts",)),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.linkdefs.*"),
format!("{topic_prefix}.{lattice_prefix}.linkdefs",)
),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.get.*"),
format!("{topic_prefix}.{lattice_prefix}.get")
),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.auction.>",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.cmd.{host_id}.*",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.get.{host_id}.inv",)),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.config.>"),
format!("{topic_prefix}.{lattice_prefix}.config"),
),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.get.config.>"),
format!("{topic_prefix}.{lattice_prefix}.get.config")
),
)
.context("failed to subscribe to queues")?;
let (
registries,
pings,
links,
queries,
auction,
commands,
inventory,
labels,
config,
config_get,
) = try_join!(
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.registries.put",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.ping.hosts",)),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.linkdefs.*"),
format!("{topic_prefix}.{lattice_prefix}.linkdefs",)
),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.get.*"),
format!("{topic_prefix}.{lattice_prefix}.get")
),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.auction.>",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.cmd.{host_id}.*",)),
nats.subscribe(format!("{topic_prefix}.{lattice_prefix}.get.{host_id}.inv",)),
nats.subscribe(format!(
"{topic_prefix}.{lattice_prefix}.labels.{host_id}.*",
)),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.config.>"),
format!("{topic_prefix}.{lattice_prefix}.config"),
),
nats.queue_subscribe(
format!("{topic_prefix}.{lattice_prefix}.get.config.>"),
format!("{topic_prefix}.{lattice_prefix}.get.config")
),
)
.context("failed to subscribe to queues")?;
Ok(Self {
auction,
commands,
pings,
inventory,
labels,
links,
queries,
registries,
Expand Down Expand Up @@ -1557,7 +1577,7 @@ pub struct Host {
heartbeat: AbortHandle,
host_config: HostConfig,
host_key: Arc<KeyPair>,
labels: HashMap<String, String>,
labels: RwLock<HashMap<String, String>>,
ctl_topic_prefix: String,
/// NATS client to use for control interface subscriptions and jetstream queries
ctl_nats: async_nats::Client,
Expand Down Expand Up @@ -2045,7 +2065,7 @@ impl Host {
heartbeat: heartbeat_abort.clone(),
ctl_topic_prefix: config.ctl_topic_prefix.clone(),
host_key,
labels,
labels: RwLock::new(labels),
ctl_nats,
rpc_nats,
host_config: config,
Expand Down Expand Up @@ -2253,7 +2273,7 @@ impl Host {
host.publish_event(
"host_stopped",
json!({
"labels": host.labels,
"labels": *host.labels.read().await,
}),
)
.await
Expand Down Expand Up @@ -2330,7 +2350,7 @@ impl Host {
json!({
"actors": actors,
"friendly_name": self.friendly_name,
"labels": self.labels,
"labels": *self.labels.read().await,
"providers": providers,
"uptime_human": human_friendly_uptime(uptime),
"uptime_seconds": uptime.as_secs(),
Expand Down Expand Up @@ -3629,7 +3649,7 @@ impl Host {
let buf = serde_json::to_vec(&HostInventory {
host_id: self.host_key.public_key(),
issuer: self.cluster_key.public_key(),
labels: self.labels.clone(),
labels: self.labels.read().await.clone(),
friendly_name: self.friendly_name.clone(),
actors,
providers,
Expand Down Expand Up @@ -3675,7 +3695,7 @@ impl Host {

#[instrument(level = "trace", skip_all)]
async fn handle_config_get_one(&self, entity_id: &str, key: &str) -> anyhow::Result<Bytes> {
trace!("getting config");
trace!(%entity_id, %key, "handling config");
let json = match self
.config_data_cache
.read()
Expand All @@ -3701,7 +3721,7 @@ impl Host {

#[instrument(level = "trace", skip(self))]
async fn handle_config_get(&self, entity_id: &str) -> anyhow::Result<Bytes> {
trace!(%entity_id, "getting all config");
trace!(%entity_id, "handling all config");
self.config_data_cache
.read()
.await
Expand All @@ -3716,6 +3736,37 @@ impl Host {
)
}

#[instrument(level = "debug", skip_all)]
async fn handle_label_put(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<Bytes> {
let HostLabel { key, value } = serde_json::from_slice(payload.as_ref())
.context("failed to deserialize put label request")?;
let mut labels = self.labels.write().await;
match labels.entry(key) {
Entry::Occupied(mut entry) => {
info!(key = entry.key(), value, "updated label");
entry.insert(value);
}
Entry::Vacant(entry) => {
info!(key = entry.key(), value, "set label");
entry.insert(value);
}
}
Ok(ACCEPTED.into())
}

#[instrument(level = "debug", skip_all)]
async fn handle_label_del(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<Bytes> {
let HostLabel { key, .. } = serde_json::from_slice(payload.as_ref())
.context("failed to deserialize delete label request")?;
let mut labels = self.labels.write().await;
if labels.remove(&key).is_some() {
info!(key, "removed label");
} else {
warn!(key, "could not remove unset label");
}
Ok(ACCEPTED.into())
}

#[instrument(level = "debug", skip_all)]
async fn handle_linkdef_put(&self, payload: impl AsRef<[u8]>) -> anyhow::Result<Bytes> {
let payload = payload.as_ref();
Expand Down Expand Up @@ -3862,7 +3913,7 @@ impl Host {
let buf = serde_json::to_vec(&json!({
"id": self.host_key.public_key(),
"issuer": self.cluster_key.public_key(),
"labels": self.labels,
"labels": *self.labels.read().await,
"friendly_name": self.friendly_name,
"uptime_seconds": uptime.as_secs(),
"uptime_human": human_friendly_uptime(uptime),
Expand Down Expand Up @@ -3940,6 +3991,12 @@ impl Host {
(Some("get"), Some("config"), Some(entity_id), None) => {
self.handle_config_get(entity_id).await.map(Some)
}
(Some("labels"), Some(_host_id), Some("del"), None) => {
self.handle_label_del(message.payload).await.map(Some)
}
(Some("labels"), Some(_host_id), Some("put"), None) => {
self.handle_label_put(message.payload).await.map(Some)
}
(Some("linkdefs"), Some("put"), None, None) => {
self.handle_linkdef_put(message.payload).await.map(Some)
}
Expand Down