From e9fe020a0906cb377f6ea8bd3a9879e5bad877b7 Mon Sep 17 00:00:00 2001 From: Victor Adossi Date: Wed, 7 Jun 2023 23:50:14 +0900 Subject: [PATCH] feat: add wash dev command Signed-off-by: Victor Adossi --- Cargo.lock | 80 +++++- Cargo.toml | 4 +- crates/wash-lib/src/actor.rs | 163 +++++++++++ crates/wash-lib/src/build.rs | 1 + crates/wash-lib/src/cli/dev.rs | 68 +++++ crates/wash-lib/src/cli/mod.rs | 1 + crates/wash-lib/src/cli/output.rs | 6 + crates/wash-lib/src/cli/start.rs | 108 +++----- crates/wash-lib/src/cli/stop.rs | 75 ++--- crates/wash-lib/src/generate/emoji.rs | 6 + crates/wash-lib/src/lib.rs | 1 + crates/wash-lib/src/start/mod.rs | 8 +- crates/wash-lib/src/wait.rs | 11 +- flake.nix | 1 + src/common/start_cmd.rs | 4 +- src/common/stop_cmd.rs | 4 +- src/ctl/mod.rs | 32 +-- src/dev.rs | 376 ++++++++++++++++++++++++++ src/down/mod.rs | 2 +- src/main.rs | 13 + src/up/mod.rs | 61 ++++- tests/common.rs | 68 ++++- tests/integration_build.rs | 58 +--- tests/integration_dev.rs | 90 ++++++ tests/integration_up.rs | 16 +- 25 files changed, 1014 insertions(+), 243 deletions(-) create mode 100644 crates/wash-lib/src/actor.rs create mode 100644 crates/wash-lib/src/cli/dev.rs create mode 100644 src/dev.rs create mode 100644 tests/integration_dev.rs diff --git a/Cargo.lock b/Cargo.lock index 58260ca6..808f7980 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,16 +938,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "ctrlc" -version = "3.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a011bbe2c35ce9c1f143b7af6f94f29a167beb4cd1d29e6740ce836f723120e" -dependencies = [ - "nix 0.26.2", - "windows-sys 0.48.0", -] - [[package]] name = "curve25519-dalek" version = "3.2.0" @@ -1344,6 +1334,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "fsio" version = "0.4.0" @@ -1774,6 +1773,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -1862,6 +1881,26 @@ dependencies = [ "sha2 0.10.6", ] +[[package]] +name = "kqueue" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c8fc60ba15bf51257aa9807a48a61013db043fcf3a78cb0d916e8e396dcad98" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8367585489f01bc55dd27404dcf56b95e6da061a256a666ab23be9ba96a2e587" +dependencies = [ + "bitflags", + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2085,6 +2124,24 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d9ba6c734de18ca27c8cef5cd7058aa4ac9f63596131e4c7e41e579319032a2" +dependencies = [ + "bitflags", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "windows-sys 0.45.0", +] + [[package]] name = "ntapi" version = "0.4.1" @@ -4178,14 +4235,15 @@ dependencies = [ "clap_complete", "cloudevents-sdk 0.6.0", "console", - "ctrlc", "dirs", "env_logger 0.10.0", "envmnt", "futures", "indicatif", "log", + "nix 0.26.2", "nkeys", + "notify", "oci-distribution", "once_cell", "provider-archive", diff --git a/Cargo.toml b/Cargo.toml index 270ef80d..81cbbc77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ cargo_atelier = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } cloudevents-sdk = { workspace = true } console = { workspace = true } -ctrlc = { workspace = true } dirs = { workspace = true } env_logger = { workspace = true } envmnt = { workspace = true } @@ -58,6 +57,7 @@ wasmcloud-control-interface = { workspace = true } wasmbus-rpc = { workspace = true } wasmcloud-test-util = { workspace = true } clap_complete = { workspace = true } +notify = "6.0.0" [dev-dependencies] reqwest = { workspace = true, features = ["json", "rustls-tls"] } @@ -72,6 +72,7 @@ futures = { workspace = true } # serial_test ensures serial execution when running with cargo, '_serial' works with nextest serial_test = { workspace = true } rand = "0.8.5" +nix = { version = "0.26.2", default-features = false, features = [ "signal" ] } [[bin]] bench = true @@ -97,7 +98,6 @@ cloudevents-sdk = "0.6.0" command-group = "1.0.8" config = "0.13.1" console = "0.15" -ctrlc = "3.4.0" dialoguer = "0.10.4" dirs = "4.0" env_logger = "0.10" diff --git a/crates/wash-lib/src/actor.rs b/crates/wash-lib/src/actor.rs new file mode 100644 index 00000000..eb59f446 --- /dev/null +++ b/crates/wash-lib/src/actor.rs @@ -0,0 +1,163 @@ +use std::collections::HashMap; + +use anyhow::{bail, Context, Result}; +use tokio::time::Duration; +use wasmcloud_control_interface::Client as CtlClient; + +use crate::{ + common::boxed_err_to_anyhow, + config::DEFAULT_START_ACTOR_TIMEOUT_MS, + wait::{ + wait_for_actor_start_event, wait_for_actor_stop_event, ActorStoppedInfo, FindEventOutcome, + }, +}; + +/// Arguments required when starting an actor +pub struct StartActorArgs<'a> { + pub ctl_client: &'a CtlClient, + pub host_id: &'a str, + pub actor_ref: &'a str, + pub count: u16, + pub skip_wait: bool, + pub timeout_ms: Option, +} + +/// Information related to an actor start +pub struct ActorStartedInfo { + pub host_id: String, + pub actor_ref: String, + pub actor_id: Option, +} + +/// Start a Wasmcloud actor +pub async fn start_actor( + StartActorArgs { + ctl_client, + host_id, + actor_ref, + count, + skip_wait, + timeout_ms, + }: StartActorArgs<'_>, +) -> Result { + // If timeout isn't supplied, override with a longer timeout for starting actor + let timeout_ms = timeout_ms.unwrap_or(DEFAULT_START_ACTOR_TIMEOUT_MS); + + // Create a receiver to use with the client + let mut receiver = ctl_client + .events_receiver() + .await + .map_err(boxed_err_to_anyhow) + .context("Failed to get lattice event channel")?; + + // Start the actor + let ack = ctl_client + .start_actor(host_id, actor_ref, count, None) + .await + .map_err(boxed_err_to_anyhow) + .with_context(|| format!("Failed to start actor: {}", actor_ref))?; + + if !ack.accepted { + bail!("Start actor ack not accepted: {}", ack.error); + } + + // If skip_wait is specified, return incomplete information immediately + if skip_wait { + return Ok(ActorStartedInfo { + host_id: host_id.into(), + actor_ref: actor_ref.into(), + actor_id: None, + }); + } + + // Wait for the actor to start + let event = wait_for_actor_start_event( + &mut receiver, + Duration::from_millis(timeout_ms), + host_id.into(), + actor_ref.into(), + ) + .await + .with_context(|| { + format!( + "Timed out waitng for start event for actor [{}] on host [{}]", + actor_ref, host_id + ) + })?; + + match event { + FindEventOutcome::Success(info) => Ok(info), + FindEventOutcome::Failure(err) => Err(err).with_context(|| { + format!( + "Failed to start actor [{}] on host [{}]", + actor_ref, host_id + ) + }), + } +} + +/// Scale a Wasmcloud actor on a given host +pub async fn scale_actor( + client: &CtlClient, + host_id: &str, + actor_ref: &str, + actor_id: &str, + count: u16, + annotations: Option>, +) -> Result<()> { + let ack = client + .scale_actor(host_id, actor_ref, actor_id, count, annotations) + .await + .map_err(boxed_err_to_anyhow)?; + + if !ack.accepted { + bail!("Operation failed: {}", ack.error); + } + + Ok(()) +} + +/// Stop an actor +pub async fn stop_actor( + client: &CtlClient, + host_id: &str, + actor_id: &str, + count: u16, + annotations: Option>, + timeout_ms: u64, + skip_wait: bool, +) -> Result { + let mut receiver = client + .events_receiver() + .await + .map_err(boxed_err_to_anyhow)?; + + let ack = client + .stop_actor(host_id, actor_id, count, annotations) + .await + .map_err(boxed_err_to_anyhow)?; + + if !ack.accepted { + bail!("Operation failed: {}", ack.error); + } + + if skip_wait { + return Ok(ActorStoppedInfo { + actor_id: actor_id.into(), + host_id: host_id.into(), + }); + } + + let event = wait_for_actor_stop_event( + &mut receiver, + Duration::from_millis(timeout_ms), + host_id.to_string(), + actor_id.to_string(), + ) + .await?; + + match event { + FindEventOutcome::Success(info) => Ok(info), + FindEventOutcome::Failure(err) => Err(err), + } +} diff --git a/crates/wash-lib/src/build.rs b/crates/wash-lib/src/build.rs index 0b24dacd..30c688b0 100644 --- a/crates/wash-lib/src/build.rs +++ b/crates/wash-lib/src/build.rs @@ -15,6 +15,7 @@ use crate::parser::{ /// Configuration for signing an artifact (actor or provider) including issuer and subject key, the path to where keys can be found, and an option to /// disable automatic key generation if keys cannot be found. +#[derive(Debug, Clone)] pub struct SignConfig { /// Location of key files for signing pub keys_directory: Option, diff --git a/crates/wash-lib/src/cli/dev.rs b/crates/wash-lib/src/cli/dev.rs new file mode 100644 index 00000000..1524ae8f --- /dev/null +++ b/crates/wash-lib/src/cli/dev.rs @@ -0,0 +1,68 @@ +use anyhow::Result; +use console::style; +use wasmcloud_control_interface::Client; + +use crate::{ + actor::{start_actor, stop_actor, StartActorArgs}, + build::{build_project, SignConfig}, + context::default_timeout_ms, + generate::emoji, + id::{ModuleId, ServerId}, + parser::{ProjectConfig, TypeConfig}, +}; + +/// Perform a single execution of the dev loop for an artifact +pub async fn run_dev_loop( + project_cfg: &ProjectConfig, + actor_id: ModuleId, + actor_ref: &str, + host_id: ServerId, + ctl_client: &Client, + sign_cfg: Option, +) -> Result<()> { + let built_artifact_path = build_project(project_cfg, sign_cfg)?.canonicalize()?; + + // Restart the artifact so that changes can be observed + match project_cfg.project_type { + TypeConfig::Interface(_) | TypeConfig::Provider(_) => { + eprintln!( + "{} {}", + emoji::WARN, + style("`wash build` interfaces and providers are not yet supported, skipping...") + .bold(), + ); + } + TypeConfig::Actor(_) => { + eprintln!( + "{} {}", + emoji::RECYCLE, + style(format!( + "restarting actor @ [{}]...", + built_artifact_path.display() + )) + .bold(), + ); + stop_actor( + ctl_client, + &host_id, + &actor_id, + 1, + None, + default_timeout_ms(), + false, + ) + .await?; + start_actor(StartActorArgs { + ctl_client, + host_id: &host_id, + actor_ref, + count: 1, + skip_wait: false, + timeout_ms: None, + }) + .await?; + } + } + + Ok(()) +} diff --git a/crates/wash-lib/src/cli/mod.rs b/crates/wash-lib/src/cli/mod.rs index 427cd47d..3e00baad 100644 --- a/crates/wash-lib/src/cli/mod.rs +++ b/crates/wash-lib/src/cli/mod.rs @@ -37,6 +37,7 @@ use crate::{ pub mod capture; pub mod claims; +pub mod dev; pub mod get; pub mod inspect; pub mod link; diff --git a/crates/wash-lib/src/cli/output.rs b/crates/wash-lib/src/cli/output.rs index 19bf57a4..210f5e32 100644 --- a/crates/wash-lib/src/cli/output.rs +++ b/crates/wash-lib/src/cli/output.rs @@ -63,3 +63,9 @@ pub struct GetClaimsOutput { pub claims: GetClaimsResponse, pub success: bool, } + +/// JSON output representation of the `wash dev` command +#[derive(Debug, Deserialize)] +pub struct DevCommandOutput { + pub success: bool, +} diff --git a/crates/wash-lib/src/cli/start.rs b/crates/wash-lib/src/cli/start.rs index 3f02b932..4b5b4246 100644 --- a/crates/wash-lib/src/cli/start.rs +++ b/crates/wash-lib/src/cli/start.rs @@ -6,6 +6,7 @@ use clap::Parser; use tokio::time::Duration; use crate::{ + actor::{start_actor, ActorStartedInfo, StartActorArgs}, cli::{labels_vec_to_hashmap, CliConnectionOpts, CommandOutput}, common::boxed_err_to_anyhow, config::{ @@ -14,10 +15,7 @@ use crate::{ }, context::default_timeout_ms, id::ServerId, - wait::{ - wait_for_actor_start_event, wait_for_provider_start_event, ActorStartedInfo, - FindEventOutcome, ProviderStartedInfo, - }, + wait::{wait_for_provider_start_event, FindEventOutcome, ProviderStartedInfo}, }; #[derive(Debug, Clone, Parser)] @@ -63,7 +61,7 @@ pub struct StartActorCommand { pub skip_wait: bool, } -pub async fn start_actor(cmd: StartActorCommand) -> Result { +pub async fn handle_start_actor(cmd: StartActorCommand) -> Result { // If timeout isn't supplied, override with a longer timeout for starting actor let timeout_ms = if cmd.opts.timeout_ms == DEFAULT_NATS_TIMEOUT_MS { DEFAULT_START_ACTOR_TIMEOUT_MS @@ -100,74 +98,44 @@ pub async fn start_actor(cmd: StartActorCommand) -> Result { } }; - let mut receiver = client - .events_receiver() - .await - .map_err(boxed_err_to_anyhow) - .context("Failed to get lattice event channel")?; - - let ack = client - .start_actor(&host, &cmd.actor_ref, cmd.count, None) - .await - .map_err(boxed_err_to_anyhow) - .with_context(|| format!("Failed to start actor: {}", &cmd.actor_ref))?; - - if !ack.accepted { - bail!("Start actor ack not accepted: {}", ack.error); - } - - if cmd.skip_wait { - let text = format!( - "Start actor request received: {}, host: {}", - &cmd.actor_ref, &host - ); - return Ok(CommandOutput::new( - text.clone(), - HashMap::from([ - ("result".into(), text.into()), - ("actor_ref".into(), cmd.actor_ref.into()), - ("host_id".into(), host.to_string().into()), - ]), - )); - } - - let event = wait_for_actor_start_event( - &mut receiver, - Duration::from_millis(timeout_ms), - host.to_string(), - cmd.actor_ref.clone(), - ) - .await - .with_context(|| { + // Start the actor + let ActorStartedInfo { + host_id, + actor_ref, + actor_id, + } = start_actor(StartActorArgs { + ctl_client: &client, + host_id: &host, + actor_ref: &cmd.actor_ref, + count: cmd.count, + skip_wait: cmd.skip_wait, + timeout_ms: Some(timeout_ms), + }) + .await?; + + let text = if cmd.skip_wait { format!( - "Timed out waitng for start event for actor {} on host {}", - &cmd.actor_ref, &host + "Start actor [{}] request received on host [{}]", + actor_ref, host_id ) - })?; + } else { + format!( + "Actor [{}] (ref: [{}]) started on host [{}]", + actor_id.clone().unwrap_or("".into()), + &actor_ref, + &host_id + ) + }; - match event { - FindEventOutcome::Success(ActorStartedInfo { - actor_id, - host_id, - actor_ref, - }) => { - let text = format!( - "Actor [{}] (ref: [{}]) started on host [{}]", - &actor_id, &actor_ref, &host_id - ); - Ok(CommandOutput::new( - text.clone(), - HashMap::from([ - ("result".into(), text.into()), - ("actor_ref".into(), actor_ref.into()), - ("actor_id".into(), actor_id.into()), - ("host_id".into(), host_id.into()), - ]), - )) - } - FindEventOutcome::Failure(err) => Err(err) - .with_context(|| format!("Failed to start actor {} on host {}", &cmd.actor_ref, &host)), - } + Ok(CommandOutput::new( + text.clone(), + HashMap::from([ + ("result".into(), text.into()), + ("actor_ref".into(), actor_ref.into()), + ("actor_id".into(), actor_id.into()), + ("host_id".into(), host_id.into()), + ]), + )) } #[derive(Debug, Clone, Parser)] diff --git a/crates/wash-lib/src/cli/stop.rs b/crates/wash-lib/src/cli/stop.rs index be2fe1e0..79b4dd2d 100644 --- a/crates/wash-lib/src/cli/stop.rs +++ b/crates/wash-lib/src/cli/stop.rs @@ -4,15 +4,13 @@ use std::collections::HashMap; use tokio::time::Duration; use crate::{ + actor::stop_actor, cli::{CliConnectionOpts, CommandOutput}, common::boxed_err_to_anyhow, config::WashConnectionOptions, context::default_timeout_ms, id::{validate_contract_id, ModuleId, ServerId, ServiceId}, - wait::{ - wait_for_actor_stop_event, wait_for_provider_stop_event, ActorStoppedInfo, - FindEventOutcome, ProviderStoppedInfo, - }, + wait::{wait_for_provider_stop_event, ActorStoppedInfo, FindEventOutcome, ProviderStoppedInfo}, }; #[derive(Debug, Clone, Parser)] @@ -167,59 +165,36 @@ pub async fn stop_provider(cmd: StopProviderCommand) -> Result { } } -pub async fn stop_actor(cmd: StopActorCommand) -> Result { +pub async fn handle_stop_actor(cmd: StopActorCommand) -> Result { let timeout_ms = cmd.opts.timeout_ms; let wco: WashConnectionOptions = cmd.opts.try_into()?; let client = wco.into_ctl_client(None).await?; - let mut receiver = client - .events_receiver() - .await - .map_err(boxed_err_to_anyhow)?; - - let ack = client - .stop_actor(&cmd.host_id, &cmd.actor_id, cmd.count, None) - .await - .map_err(boxed_err_to_anyhow)?; - - if !ack.accepted { - bail!("Operation failed: {}", ack.error); - } - - if cmd.skip_wait { - let text = format!("Request to stop actor {} received", cmd.actor_id); - return Ok(CommandOutput::new( - text.clone(), - HashMap::from([ - ("result".into(), text.into()), - ("actor_id".into(), cmd.actor_id.to_string().into()), - ("host_id".into(), cmd.host_id.to_string().into()), - ]), - )); - } - - let event = wait_for_actor_stop_event( - &mut receiver, - Duration::from_millis(timeout_ms), - cmd.host_id.to_string(), - cmd.actor_id.to_string(), + let ActorStoppedInfo { actor_id, host_id } = stop_actor( + &client, + &cmd.host_id, + &cmd.actor_id, + cmd.count, + None, + timeout_ms, + cmd.skip_wait, ) .await?; - match event { - FindEventOutcome::Success(ActorStoppedInfo { actor_id, host_id }) => { - let text = format!("Actor [{}] stopped", &actor_id); - Ok(CommandOutput::new( - text.clone(), - HashMap::from([ - ("result".into(), text.into()), - ("actor_id".into(), actor_id.into()), - ("host_id".into(), host_id.into()), - ]), - )) - } - FindEventOutcome::Failure(err) => bail!("{}", err), - } + let text = if cmd.skip_wait { + format!("Request to stop actor {} received", &actor_id) + } else { + format!("Actor [{}] stopped", &actor_id) + }; + + Ok(CommandOutput::new( + text.clone(), + HashMap::from([ + ("result".into(), text.into()), + ("actor_id".into(), actor_id.into()), + ("host_id".into(), host_id.into()), + ]), + )) } pub async fn stop_host(cmd: StopHostCommand) -> Result { diff --git a/crates/wash-lib/src/generate/emoji.rs b/crates/wash-lib/src/generate/emoji.rs index 761b1f6b..4001e205 100644 --- a/crates/wash-lib/src/generate/emoji.rs +++ b/crates/wash-lib/src/generate/emoji.rs @@ -8,7 +8,13 @@ use console::Emoji; pub static ERROR: Emoji<'_, '_> = Emoji("⛔ ", ""); pub static SPARKLE: Emoji<'_, '_> = Emoji("✨ ", ""); +pub static GREEN_CHECK: Emoji<'_, '_> = Emoji("✅ ", ""); pub static WARN: Emoji<'_, '_> = Emoji("⚠️ ", ""); pub static WRENCH: Emoji<'_, '_> = Emoji("🔧 ", ""); pub static SHRUG: Emoji<'_, '_> = Emoji("🤷 ", ""); pub static INFO: Emoji<'_, '_> = Emoji("💡 ", ""); +pub static RECYCLE: Emoji<'_, '_> = Emoji("♻️ ", ""); +pub static INFO_SQUARE: Emoji<'_, '_> = Emoji("ℹ️️ ", ""); +pub static HOURGLASS_DRAINING: Emoji<'_, '_> = Emoji("⏳ ", ""); +pub static HOURGLASS_FULL: Emoji<'_, '_> = Emoji("⌛ ", ""); +pub static CONSTRUCTION_BARRIER: Emoji<'_, '_> = Emoji("🚧 ", ""); diff --git a/crates/wash-lib/src/lib.rs b/crates/wash-lib/src/lib.rs index 12eea664..1231c9b8 100644 --- a/crates/wash-lib/src/lib.rs +++ b/crates/wash-lib/src/lib.rs @@ -28,6 +28,7 @@ pub mod parser; #[cfg(feature = "start")] pub mod start; +pub mod actor; pub mod capture; pub mod common; pub mod config; diff --git a/crates/wash-lib/src/start/mod.rs b/crates/wash-lib/src/start/mod.rs index a9fc353b..54a8f29c 100644 --- a/crates/wash-lib/src/start/mod.rs +++ b/crates/wash-lib/src/start/mod.rs @@ -29,14 +29,14 @@ //! nats_log_file, //! config, //! ).await?; -//! +//! //! // Download wasmCloud if not already installed //! let wasmcloud_executable = ensure_wasmcloud("v0.57.1", &install_dir).await?; -//! +//! //! // Redirect output (which is on stderr) to a log file //! let log_path = install_dir.join("wasmcloud_stderr.log"); //! let log_file = tokio::fs::File::create(&log_path).await?.into_std().await; -//! +//! //! let mut wasmcloud_process = start_wasmcloud_host( //! wasmcloud_executable, //! std::process::Stdio::null(), @@ -45,7 +45,7 @@ //! ).await?; //! //! // Park thread, wasmCloud and NATS are running -//! +//! //! // Terminate processes //! nats_process.kill().await?; //! wasmcloud_process.kill().await?; diff --git a/crates/wash-lib/src/wait.rs b/crates/wash-lib/src/wait.rs index d4d8e416..7bf29308 100644 --- a/crates/wash-lib/src/wait.rs +++ b/crates/wash-lib/src/wait.rs @@ -3,6 +3,8 @@ use cloudevents::event::{AttributesReader, Event}; use tokio::sync::mpsc::Receiver; use tokio::time::{Duration, Instant}; +use crate::actor::ActorStartedInfo; + /// Useful parts of a CloudEvent coming in from the wasmbus. #[derive(Debug)] struct CloudEventData { @@ -102,13 +104,6 @@ async fn find_event( } } -/// Information related to an actor start -pub struct ActorStartedInfo { - pub host_id: String, - pub actor_ref: String, - pub actor_id: String, -} - /// Uses the NATS reciever to read events being published to the wasmCloud lattice event subject, up until the given timeout duration. /// /// If the applicable actor start response event is found (either started or failed to start), the `Ok` variant of the `Result` will be returned, @@ -137,7 +132,7 @@ pub async fn wait_for_actor_start_event( return Ok(EventCheckOutcome::Success(ActorStartedInfo { host_id: host_id.as_str().into(), actor_ref: actor_ref.as_str().into(), - actor_id, + actor_id: Some(actor_id), })); } } diff --git a/flake.nix b/flake.nix index 5ba09d29..91f2b744 100644 --- a/flake.nix +++ b/flake.nix @@ -96,6 +96,7 @@ # - external services running, which would require a more involved setup "tests/integration_build.rs" "tests/integration_claims.rs" + "tests/integration_dev.rs" "tests/integration_get.rs" "tests/integration_inspect.rs" "tests/integration_keys.rs" diff --git a/src/common/start_cmd.rs b/src/common/start_cmd.rs index 20ccec60..b98baa99 100644 --- a/src/common/start_cmd.rs +++ b/src/common/start_cmd.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use wash_lib::cli::start::{start_actor, start_provider, StartCommand}; +use wash_lib::cli::start::{handle_start_actor, start_provider, StartCommand}; use crate::{appearance::spinner::Spinner, CommandOutput, OutputKind}; @@ -15,7 +15,7 @@ pub(crate) async fn handle_command( sp.update_spinner_message(format!(" Starting actor {actor_ref} ... ")); - start_actor(cmd).await? + handle_start_actor(cmd).await? } StartCommand::Provider(cmd) => { let provider_ref = &cmd.provider_ref.to_string(); diff --git a/src/common/stop_cmd.rs b/src/common/stop_cmd.rs index 06758952..192b5856 100644 --- a/src/common/stop_cmd.rs +++ b/src/common/stop_cmd.rs @@ -1,6 +1,6 @@ use anyhow::Result; -use wash_lib::cli::stop::{stop_actor, stop_host, stop_provider, StopCommand}; +use wash_lib::cli::stop::{handle_stop_actor, stop_host, stop_provider, StopCommand}; use crate::{appearance::spinner::Spinner, CommandOutput, OutputKind}; @@ -13,7 +13,7 @@ pub async fn handle_command( StopCommand::Actor(cmd) => { let actor_id = &cmd.actor_id.to_string(); sp.update_spinner_message(format!(" Stopping actor {actor_id} ... ")); - stop_actor(cmd).await? + handle_stop_actor(cmd).await? } StopCommand::Provider(cmd) => { let provider_id = &cmd.provider_id.to_string(); diff --git a/src/ctl/mod.rs b/src/ctl/mod.rs index 080b76c0..36a551cb 100644 --- a/src/ctl/mod.rs +++ b/src/ctl/mod.rs @@ -2,12 +2,13 @@ use anyhow::{bail, Result}; use clap::{Args, Parser, Subcommand}; use std::path::Path; use wash_lib::{ + actor::scale_actor, cli::{ get::{GetClaimsCommand, GetCommand, GetHostInventoryCommand, GetHostsCommand}, labels_vec_to_hashmap, link::LinkCommand, start::StartCommand, - stop::{stop_actor, stop_host, stop_provider, StopCommand}, + stop::{handle_stop_actor, stop_host, stop_provider, StopCommand}, CliConnectionOpts, CommandOutput, OutputKind, }, config::WashConnectionOptions, @@ -190,7 +191,7 @@ pub(crate) async fn handle_command( Stop(StopCommand::Actor(cmd)) => { eprintln!("[warn] `wash ctl stop` has been deprecated in favor of `wash stop` and will be removed in a future version."); sp.update_spinner_message(format!(" Stopping actor {} ... ", cmd.actor_id)); - stop_actor(cmd.clone()).await? + handle_stop_actor(cmd.clone()).await? } Stop(StopCommand::Provider(cmd)) => { sp.update_spinner_message(format!(" Stopping provider {} ... ", cmd.provider_id)); @@ -223,7 +224,7 @@ pub(crate) async fn handle_command( " Scaling Actor {} to {} instances ... ", cmd.actor_id, cmd.count )); - scale_actor(cmd.clone()).await? + handle_scale_actor(cmd.clone()).await? } }; @@ -232,26 +233,21 @@ pub(crate) async fn handle_command( Ok(out) } -pub(crate) async fn scale_actor(cmd: ScaleActorCommand) -> Result { +pub(crate) async fn handle_scale_actor(cmd: ScaleActorCommand) -> Result { let wco: WashConnectionOptions = cmd.opts.try_into()?; let client = wco.into_ctl_client(None).await?; let annotations = labels_vec_to_hashmap(cmd.annotations)?; - let ack = client - .scale_actor( - &cmd.host_id, - &cmd.actor_ref, - &cmd.actor_id, - cmd.count, - Some(annotations), - ) - .await - .map_err(convert_error)?; - - if !ack.accepted { - bail!("Operation failed: {}", ack.error); - } + scale_actor( + &client, + &cmd.host_id, + &cmd.actor_ref, + &cmd.actor_id, + cmd.count, + Some(annotations), + ) + .await?; Ok(CommandOutput::from_key_and_text( "result", diff --git a/src/dev.rs b/src/dev.rs new file mode 100644 index 00000000..db2c118b --- /dev/null +++ b/src/dev.rs @@ -0,0 +1,376 @@ +use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::{path::PathBuf, sync::Arc}; + +use anyhow::{anyhow, bail, Context, Result}; +use clap::Parser; +use console::style; +use notify::{event::EventKind, Event as NotifyEvent, RecursiveMode, Watcher}; +use tokio::task::JoinHandle; +use tokio::time::{timeout, Duration}; +use tokio::{select, sync::mpsc}; +use wash_lib::generate::emoji; +use wash_lib::{ + actor::{scale_actor, start_actor, StartActorArgs}, + build::{build_project, SignConfig}, + cli::dev::run_dev_loop, + cli::CommandOutput, + config::downloads_dir, + id::{ModuleId, ServerId}, + parser::get_config, +}; +use wasmcloud_control_interface::Host; + +use crate::{ + down::{handle_down, DownCommand}, + up::{handle_up, NatsOpts, UpCommand, WadmOpts, WasmcloudOpts, DOWNLOADS_DIR}, +}; + +#[derive(Debug, Clone, Parser)] +pub struct DevCommand { + #[clap(flatten)] + pub(crate) nats_opts: NatsOpts, + + #[clap(flatten)] + pub(crate) wasmcloud_opts: WasmcloudOpts, + + #[clap(flatten)] + pub(crate) wadm_opts: WadmOpts, + + /// ID of the host to use for `wash dev` + /// if one is not selected, `wash dev` will attempt to use the single host in the lattice + #[clap(long = "host-id", name = "host-id", value_parser)] + pub host_id: Option, + + /// Path to code directory + #[clap(name = "code-dir", long = "work-dir", env = "WASH_DEV_CODE_DIR")] + pub code_dir: Option, + + /// Whether to leave the host running after dev + #[clap( + name = "leave-host-running", + long = "leave-host-running", + env = "WASH_DEV_LEAVE_HOST_RUNNING", + default_value = "false", + help = "Leave the wasmCloud host running after stopping the devloop" + )] + pub leave_host_running: bool, + + /// Run the host in a subprocess (rather than detached mode) + #[clap( + name = "use-host-subprocess", + long = "use-host-subprocess", + env = "WASH_DEV_USE_HOST_SUBPROCESS", + default_value = "false", + help = "Run the wasmCloud host in a subprocess (rather than detached mode)" + )] + pub use_host_subprocess: bool, +} + +/// Utility struct for holding a wasmCloud host subprocess. +/// This struct ensures that the join handle is aborted once the +/// subprocess is dropped. +struct HostSubprocess(Option>); + +impl HostSubprocess { + fn into_inner(mut self) -> Option> { + self.0.take() + } +} + +impl Drop for HostSubprocess { + fn drop(&mut self) { + if let Some(handle) = self.0.take() { + handle.abort(); + } + } +} + +/// Handle `wash dev` +pub async fn handle_command( + cmd: DevCommand, + output_kind: wash_lib::cli::OutputKind, +) -> Result { + // Check if host is running + let pid_file = downloads_dir()?.join(DOWNLOADS_DIR); + let existing_instance = tokio::fs::metadata(pid_file).await.is_ok(); + + let mut host_subprocess: Option = None; + + // Start host if it's not already running + if !existing_instance { + eprintln!( + "{} {}{}", + emoji::WARN, + style("No running wasmcloud host detected (PID file missing), ").bold(), + style("starting a new host...").bold() + ); + // Ensure that file loads are allowed + let mut wasmcloud_opts = cmd.wasmcloud_opts.clone(); + wasmcloud_opts.allow_file_load = Some(true); + + if cmd.use_host_subprocess { + // Use a subprocess + eprintln!( + "{} {}", + emoji::WRENCH, + style("starting wasmCloud host subprocess...").bold(), + ); + let nats_opts = cmd.nats_opts.clone(); + let wadm_opts = cmd.wadm_opts.clone(); + host_subprocess = Some(HostSubprocess(Some(tokio::spawn(async move { + let _ = handle_up( + UpCommand { + detached: false, + nats_opts, + wasmcloud_opts, + wadm_opts, + }, + output_kind, + ) + .await; + eprintln!( + "{} {}", + emoji::WRENCH, + style("shutting down host subprocess...").bold(), + ); + })))); + + // Wait a while for wasmcloud to start up + tokio::time::sleep(Duration::from_secs(5)).await; + } else { + // Run a detached process via running the equivalent of `wash up` + + // Run wash up to start the host if not already running + let _ = handle_up( + UpCommand { + detached: true, + nats_opts: cmd.nats_opts, + wasmcloud_opts, + wadm_opts: cmd.wadm_opts, + }, + output_kind, + ) + .await?; + } + + eprintln!( + "{} {}", + emoji::WRENCH, + style("Successfully started wasmCloud instance").bold(), + ); + } + + // Connect to the wasmcloud instance + let ctl_client = Arc::new( + cmd.wasmcloud_opts + .into_ctl_client(None) + .await + .context("failed to create wasmcloud control client")?, + ); + let wait_ctl_client = ctl_client.clone(); + + // If we started our own instance, wait for one host to be present + if !existing_instance { + eprintln!("⏳ "); + eprintln!( + "{} {}", + emoji::HOURGLASS_DRAINING, + style("Waiting for host to become reachable...").bold(), + ); + + // Wait for up to a minute to find the host + let _ = timeout( + Duration::from_secs(60), + tokio::spawn(async move { + loop { + match wait_ctl_client.get_hosts().await { + Ok(hs) => match &hs[..] { + [] => {} + [h] => { + eprintln!( + "{} {}", + emoji::GREEN_CHECK, + style(format!("Found single host w/ ID [{}]", h.id)).bold(), + ); + break Ok(()); + } + _hs => { + bail!("Detected an unexpected number (>1) of hosts present."); + } + }, + Err(e) => { + eprintln!( + "{} {}", + emoji::WARN, + style(format!("Failed to get hosts (will retry in 5s): {e}")) + .bold(), + ); + } + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + }), + ) + .await + .context("wasmCloud host did not become reachable")?; + } + + // Refresh host information (used in particular for existing instances) + let hosts = ctl_client + .get_hosts() + .await + .or_else(|e| bail!("failed to retrieve hosts from lattice: {e}"))?; + let host: Host = match &hosts[..] { + [] => bail!("0 hosts detected, is wasmCloud running?"), + [h] => h.clone(), + _ => { + if let Some(host_id) = cmd.host_id.map(ServerId::into_string) { + hosts + .into_iter() + .find(|h| h.id == host_id) + .ok_or_else(|| anyhow!("failed to find host [{host_id}]"))? + } else { + bail!( + "{} hosts detected, please specify the host on which to deploy with --host-id", + hosts.len() + ) + } + } + }; + + // Resolve project configuration from the current path + let current_dir = std::env::current_dir()?; + let project_path = cmd.code_dir.unwrap_or(current_dir); + let project_cfg = get_config(Some(project_path.clone()), Some(true))?; + + // Build the project (equivalent to `wash build`) + let sign_cfg: Option = Some(SignConfig { + keys_directory: None, + issuer: None, + subject: None, + disable_keygen: false, + }); + eprintln!( + "{} {}", + emoji::CONSTRUCTION_BARRIER, + style("Starting project build").bold(), + ); + + // Build the project + let artifact_path = build_project(&project_cfg, sign_cfg.clone())?.canonicalize()?; + eprintln!( + "✅ successfully built project at [{}]", + artifact_path.display() + ); + + // Since we're using the actor from file on disk, the ref should be the file path (canonicalized) on disk as URI + let actor_ref = format!("file://{}", artifact_path.display()); + let actor_id; + + // Attempt to find or create the actor, scaling any existing actors to zero if it exists + let inventory = ctl_client.get_host_inventory(&host.id).await.or_else(|e| { + bail!( + "failed to retrieve host inventory for host [{}]: {e}", + &host.id + ) + })?; + if let Some(existing_actor) = inventory + .actors + .into_iter() + .find(|a| a.image_ref == Some(actor_ref.clone())) + { + actor_id = existing_actor.id; + scale_actor(&ctl_client, &host.id, &actor_ref, &actor_id, 1, None).await?; + } else { + // Start the actor for the first time + actor_id = start_actor(StartActorArgs { + ctl_client: &ctl_client, + host_id: &host.id, + actor_ref: &actor_ref, + count: 1, + skip_wait: false, + timeout_ms: None, + }) + .await? + .actor_id + .ok_or_else(|| anyhow!("failed to do thing"))?; + } + + // Set up a oneshot channel to remove + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + let (reload_tx, mut reload_rx) = mpsc::channel::<()>(1); + + // Handle Ctrl + c with Tokio + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .context("failed to wait for ctrl_c signal")?; + stop_tx + .send(()) + .await + .context("failed to send stop signal after receiving Ctrl + c")?; + Result::<_, anyhow::Error>::Ok(()) + }); + + // Enable/disable watching to prevent having the output artifact trigger a rebuild + let pause_watch = Arc::new(AtomicBool::new(false)); + let watcher_paused = pause_watch.clone(); + + // Spawn a file watcher to listen for changes and send on reload_tx + let mut watcher = notify::recommended_watcher(move |res: _| match res { + Ok(event) => match event { + NotifyEvent { + kind: EventKind::Create(_), + .. + } + | NotifyEvent { + kind: EventKind::Modify(_), + .. + } + | NotifyEvent { + kind: EventKind::Remove(_), + .. + } => { + // If watch has been paused for any reason, skip notifications + if watcher_paused.load(Ordering::SeqCst) { + return; + } + + let _ = reload_tx.blocking_send(()); + } + _ => {} + }, + Err(e) => { + eprintln!("[error] watch failed: {:?}", e); + } + })?; + watcher.watch(&project_path.clone(), RecursiveMode::Recursive)?; + + // Watch FS for changes and listen for Ctrl + C in tandem + eprintln!("👀 watching for file changes (press Ctrl+c to stop)..."); + loop { + select! { + _ = reload_rx.recv() => { + pause_watch.store(true, Ordering::SeqCst); + run_dev_loop(&project_cfg, ModuleId::from_str(&actor_id)?, &actor_ref, ServerId::from_str(&host.id)?, &ctl_client, sign_cfg.clone()).await?; + pause_watch.store(false, Ordering::SeqCst); + eprintln!("👀 watching for file changes (press Ctrl+c to stop)..."); + }, + _ = stop_rx.recv() => { + pause_watch.store(true, Ordering::SeqCst); + eprintln!("🛑 received Ctrl + c, stopping devloop..."); + + if !cmd.leave_host_running { + eprintln!("⏳ stopping wasmCloud instance..."); + handle_down(DownCommand::default(), output_kind).await?; + if let Some(handle) = host_subprocess.and_then(|hs| hs.into_inner()) { + handle.await?; + } + } + + break Ok(CommandOutput::default()); + }, + } + } +} diff --git a/src/down/mod.rs b/src/down/mod.rs index 416d91f1..06fefedd 100644 --- a/src/down/mod.rs +++ b/src/down/mod.rs @@ -21,7 +21,7 @@ use crate::up::{ }; use crate::util::nats_client_from_opts; -#[derive(Parser, Debug, Clone)] +#[derive(Parser, Debug, Clone, Default)] pub(crate) struct DownCommand { /// A lattice prefix is a unique identifier for a lattice, and is frequently used within NATS topics to isolate messages from different lattices #[clap( diff --git a/src/main.rs b/src/main.rs index ce70bb1a..47436a0a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use anyhow::Result; +use dev::DevCommand; use serde_json::json; use smithy::{GenerateCli, LintCli, ValidateCli}; use wash_lib::{ @@ -41,6 +42,7 @@ mod common; mod completions; mod ctl; mod ctx; +mod dev; mod down; mod drain; mod generate; @@ -72,6 +74,7 @@ Applications: Projects: build Build (and sign) a wasmCloud actor, provider, or interface claims Generate and manage JWTs for wasmCloud actors + dev Run a actor development loop (experimental) gen Generate code from smithy IDL files inspect Inspect capability provider or actor module lint Perform lint checks on smithy models @@ -152,6 +155,9 @@ enum CliCommand { /// Manage wasmCloud host configuration contexts #[clap(name = "ctx", subcommand)] Ctx(CtxCommand), + /// (experimental) Run a local development loop for an actor + #[clap(name = "dev")] + Dev(DevCommand), /// Tear down a wasmCloud environment launched with wash up #[clap(name = "down")] Down(DownCommand), @@ -237,6 +243,13 @@ async fn main() { } CliCommand::Ctl(ctl_cli) => ctl::handle_command(ctl_cli, output_kind).await, CliCommand::Ctx(ctx_cli) => ctx::handle_command(ctx_cli).await, + CliCommand::Dev(dev_cli) => { + if cli.experimental { + dev::handle_command(dev_cli, output_kind).await + } else { + experimental_error_message("dev") + } + } CliCommand::Down(down_cli) => down::handle_command(down_cli, output_kind).await, CliCommand::Drain(drain_cli) => drain::handle_command(drain_cli), CliCommand::Get(get_cli) => common::get_cmd::handle_command(get_cli, output_kind).await, diff --git a/src/up/mod.rs b/src/up/mod.rs index 2e274d78..7b2d872f 100644 --- a/src/up/mod.rs +++ b/src/up/mod.rs @@ -8,7 +8,7 @@ use std::sync::{ Arc, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use async_nats::Client; use clap::Parser; use serde_json::json; @@ -19,6 +19,8 @@ use tokio::{ process::Child, }; use wash_lib::cli::{CommandOutput, OutputKind}; +use wash_lib::config::downloads_dir; +use wash_lib::config::DEFAULT_NATS_TIMEOUT_MS; use wash_lib::start::ensure_wadm; use wash_lib::start::find_wasmcloud_binary; use wash_lib::start::nats_pid_path; @@ -28,9 +30,9 @@ use wash_lib::start::{ ensure_nats_server, ensure_wasmcloud, start_nats_server, start_wasmcloud_host, wait_for_server, NatsConfig, }; +use wasmcloud_control_interface::{Client as CtlClient, ClientBuilder as CtlClientBuilder}; use crate::appearance::spinner::Spinner; -use crate::cfg::cfg_dir; use crate::down::stop_nats; use crate::util::nats_client_from_opts; @@ -262,6 +264,49 @@ pub(crate) struct WasmcloudOpts { pub(crate) start_only: bool, } +impl WasmcloudOpts { + pub async fn into_ctl_client(self, auction_timeout_ms: Option) -> Result { + let lattice_prefix = self.lattice_prefix; + let ctl_host = self + .ctl_host + .unwrap_or_else(|| DEFAULT_NATS_HOST.to_string()); + let ctl_port = self.ctl_port.unwrap_or(4222).to_string(); + let auction_timeout_ms = auction_timeout_ms.unwrap_or(DEFAULT_NATS_TIMEOUT_MS); + + let nc = nats_client_from_opts( + &ctl_host, + &ctl_port, + self.ctl_jwt, + self.ctl_seed, + self.ctl_credsfile, + ) + .await + .context("Failed to create NATS client")?; + + let mut builder = CtlClientBuilder::new(nc) + .lattice_prefix(lattice_prefix) + .rpc_timeout(tokio::time::Duration::from_millis( + self.rpc_timeout_ms.into(), + )) + .auction_timeout(tokio::time::Duration::from_millis(auction_timeout_ms)); + + if let Some(js_domain) = self.wasmcloud_js_domain { + builder = builder.js_domain(js_domain); + } + + if let Ok(topic_prefix) = std::env::var("WASMCLOUD_CTL_TOPIC_PREFIX") { + builder = builder.topic_prefix(topic_prefix); + } + + let ctl_client = builder + .build() + .await + .map_err(|err| anyhow!("Failed to create control interface client: {err:?}"))?; + + Ok(ctl_client) + } +} + #[derive(Parser, Debug, Clone)] pub(crate) struct WadmOpts { /// wadm version to download, e.g. `v0.4.0`. See https://github.com/wasmCloud/wadm/releases for releases @@ -280,7 +325,7 @@ pub(crate) async fn handle_command( } pub(crate) async fn handle_up(cmd: UpCommand, output_kind: OutputKind) -> Result { - let install_dir = cfg_dir()?.join(DOWNLOADS_DIR); + let install_dir = downloads_dir()?; create_dir_all(&install_dir).await?; let spinner = Spinner::new(&output_kind)?; @@ -548,15 +593,19 @@ async fn run_wasmcloud_interactive( let (running_sender, running_receiver) = channel(); let running = Arc::new(AtomicBool::new(true)); - ctrlc::set_handler(move || { + // Handle Ctrl + c with Tokio + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .context("failed to wait for ctrl_c signal")?; if running.load(Ordering::SeqCst) { running.store(false, Ordering::SeqCst); let _ = running_sender.send(true); } else { log::warn!("\nRepeated CTRL+C received, killing wasmCloud and NATS. This may result in zombie processes") } - }) - .expect("Error setting Ctrl-C handler, please file a bug issue https://github.com/wasmCloud/wash/issues/new/choose"); + Result::<_, anyhow::Error>::Ok(()) + }); if output_kind != OutputKind::Json { println!("🏃 Running in interactive mode, your host is running at http://localhost:{port}",); diff --git a/tests/common.rs b/tests/common.rs index 9ac571a4..8b2c279b 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -7,7 +7,9 @@ use std::{ use anyhow::{bail, Context, Result}; use rand::{distributions::Alphanumeric, Rng}; +use tempfile::TempDir; use tokio::net::TcpStream; +use tokio::process::Command; use sysinfo::SystemExt; use tokio::process::Child; @@ -65,7 +67,7 @@ pub(crate) fn test_dir_file(subfolder: &str, file: &str) -> PathBuf { } #[allow(unused)] -async fn start_nats(port: u16, nats_install_dir: &PathBuf) -> Result { +pub(crate) async fn start_nats(port: u16, nats_install_dir: &PathBuf) -> Result { let nats_binary = ensure_nats_server("v2.8.4", nats_install_dir).await?; let config = NatsConfig::new_standalone("127.0.0.1", port, None); start_nats_server(nats_binary, std::process::Stdio::null(), config).await @@ -209,3 +211,67 @@ impl TestWashInstance { }) } } + +pub(crate) struct TestSetup { + /// The path to the directory for the test. + /// Added here so that the directory is not deleted until the end of the test. + #[allow(dead_code)] + pub test_dir: TempDir, + /// The path to the created actor's directory. + #[allow(dead_code)] + pub project_dir: PathBuf, +} + +#[allow(dead_code)] +pub(crate) struct WorkspaceTestSetup { + /// The path to the directory for the test. + /// Added here so that the directory is not deleted until the end of the test. + #[allow(dead_code)] + pub test_dir: TempDir, + /// The path to the created actor's directory. + #[allow(dead_code)] + pub project_dirs: Vec, +} + +/// Inits an actor build test by setting up a test directory and creating an actor from a template. +/// Returns the paths of the test directory and actor directory. +#[allow(dead_code)] +pub(crate) async fn init(actor_name: &str, template_name: &str) -> Result { + let test_dir = TempDir::new()?; + std::env::set_current_dir(&test_dir)?; + let project_dir = init_actor_from_template(actor_name, template_name).await?; + std::env::set_current_dir(&project_dir)?; + Ok(TestSetup { + test_dir, + project_dir, + }) +} + +/// Initializes a new actor from a wasmCloud template, and sets the environment to use the created actor's directory. +#[allow(dead_code)] +pub(crate) async fn init_actor_from_template( + actor_name: &str, + template_name: &str, +) -> Result { + let status = Command::new(env!("CARGO_BIN_EXE_wash")) + .args([ + "new", + "actor", + actor_name, + "--git", + "wasmcloud/project-templates", + "--subfolder", + &format!("actor/{template_name}"), + "--silent", + "--no-git-init", + ]) + .kill_on_drop(true) + .status() + .await + .context("Failed to generate project")?; + + assert!(status.success()); + + let project_dir = std::env::current_dir()?.join(actor_name); + Ok(project_dir) +} diff --git a/tests/integration_build.rs b/tests/integration_build.rs index a5081073..fc504cd1 100644 --- a/tests/integration_build.rs +++ b/tests/integration_build.rs @@ -6,6 +6,8 @@ use tokio::process::Command; mod common; +use crate::common::{init, init_actor_from_template, WorkspaceTestSetup}; + #[tokio::test] #[serial] async fn build_rust_actor_unsigned_serial() -> Result<()> { @@ -134,37 +136,6 @@ async fn build_tinygo_actor_signed_serial() -> Result<()> { Ok(()) } -struct TestSetup { - /// The path to the directory for the test. - /// Added here so that the directory is not deleted until the end of the test. - #[allow(dead_code)] - test_dir: TempDir, - /// The path to the created actor's directory. - project_dir: PathBuf, -} - -struct WorkspaceTestSetup { - /// The path to the directory for the test. - /// Added here so that the directory is not deleted until the end of the test. - #[allow(dead_code)] - test_dir: TempDir, - /// The path to the created actor's directory. - project_dirs: Vec, -} - -/// Inits an actor build test by setting up a test directory and creating an actor from a template. -/// Returns the paths of the test directory and actor directory. -async fn init(actor_name: &str, template_name: &str) -> Result { - let test_dir = TempDir::new()?; - std::env::set_current_dir(&test_dir)?; - let project_dir = init_actor_from_template(actor_name, template_name).await?; - std::env::set_current_dir(&project_dir)?; - Ok(TestSetup { - test_dir, - project_dir, - }) -} - /// Inits an actor build test by setting up a test directory and creating an actor from a template. /// Returns the paths of the test directory and actor directory. async fn init_workspace(actor_names: Vec<&str>) -> Result { @@ -200,31 +171,6 @@ async fn init_workspace(actor_names: Vec<&str>) -> Result { }) } -/// Initializes a new actor from a wasmCloud template, and sets the environment to use the created actor's directory. -async fn init_actor_from_template(actor_name: &str, template_name: &str) -> Result { - let status = Command::new(env!("CARGO_BIN_EXE_wash")) - .args([ - "new", - "actor", - actor_name, - "--git", - "wasmcloud/project-templates", - "--subfolder", - &format!("actor/{template_name}"), - "--silent", - "--no-git-init", - ]) - .kill_on_drop(true) - .status() - .await - .context("Failed to generate project")?; - - assert!(status.success()); - - let project_dir = std::env::current_dir()?.join(actor_name); - Ok(project_dir) -} - #[tokio::test] #[serial] async fn integration_build_handles_dashed_names_serial() -> Result<()> { diff --git a/tests/integration_dev.rs b/tests/integration_dev.rs new file mode 100644 index 00000000..4958f84c --- /dev/null +++ b/tests/integration_dev.rs @@ -0,0 +1,90 @@ +use std::{collections::HashMap, sync::Arc}; + +use anyhow::{anyhow, bail}; +#[cfg(target_family = "unix")] +use anyhow::{Context, Result}; +use serial_test::serial; +use tokio::{process::Command, sync::RwLock, time::Duration}; + +mod common; + +use crate::common::{init, start_nats, test_dir_with_subfolder}; + +#[tokio::test] +#[serial] +async fn integration_dev_hello_actor_serial() -> Result<()> { + let test_setup = init( + /* actor_name= */ "hello", /* template_name= */ "hello", + ) + .await?; + let project_dir = test_setup.project_dir; + + let dir = test_dir_with_subfolder("dev_hello_actor"); + let mut nats = start_nats(5895, &dir).await?; + + let dev_cmd = Arc::new(RwLock::new( + Command::new(env!("CARGO_BIN_EXE_wash")) + .args([ + "dev", + "--nats-port", + "5895", + "--nats-connect-only", + "--ctl-port", + "5895", + "--use-host-subprocess", + ]) + .kill_on_drop(true) + .envs(HashMap::from([("WASH_EXPERIMENTAL", "true")])) + .spawn() + .context("failed running cargo dev")?, + )); + let watch_dev_cmd = dev_cmd.clone(); + + let signed_file_path = Arc::new(project_dir.join("build/hello_s.wasm")); + let expected_path = signed_file_path.clone(); + + // Wait until the signed file is there (this means dev succeeded) + let _ = tokio::time::timeout( + Duration::from_secs(1200), + tokio::spawn(async move { + loop { + // If the command failed (and exited early), bail + if let Ok(Some(exit_status)) = watch_dev_cmd.write().await.try_wait() { + if !exit_status.success() { + bail!("dev command failed"); + } + } + // If the file got built, we know dev succeeded + if expected_path.exists() { + break Ok(()); + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + }), + ) + .await + .context("timed out while waiting for file path to get created")?; + assert!(signed_file_path.exists(), "signed actor file was built",); + + let process_pid = dev_cmd + .write() + .await + .id() + .context("failed to get child process pid")?; + + // Send ctrl + c signal to stop the process + // send SIGINT to the child + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(process_pid as i32), + nix::sys::signal::Signal::SIGINT, + ) + .expect("cannot send ctrl-c"); + + // Wait until the process stops + let _ = tokio::time::timeout(Duration::from_secs(15), dev_cmd.write().await.wait()) + .await + .context("dev command did not exit")?; + + nats.kill().await.map_err(|e| anyhow!(e))?; + Ok(()) +} diff --git a/tests/integration_up.rs b/tests/integration_up.rs index 0647e213..ae89e77b 100644 --- a/tests/integration_up.rs +++ b/tests/integration_up.rs @@ -1,18 +1,16 @@ use serial_test::serial; -use std::{ - fs::{read_to_string, remove_dir_all}, - path::PathBuf, -}; +use std::fs::{read_to_string, remove_dir_all}; use anyhow::{anyhow, Context, Result}; use common::test_dir_with_subfolder; use regex::Regex; use sysinfo::{ProcessExt, SystemExt}; -use tokio::process::{Child, Command}; -use wash_lib::start::{ensure_nats_server, start_nats_server, NatsConfig}; +use tokio::process::Command; mod common; +use common::start_nats; + const RGX_ACTOR_START_MSG: &str = r"Actor \[(?P[^]]+)\] \(ref: \[(?P[^]]+)\]\) started on host \[(?P[^]]+)\]"; #[tokio::test] @@ -290,9 +288,3 @@ async fn integration_up_doesnt_kill_unowned_nats_serial() -> Result<()> { remove_dir_all(dir).unwrap(); Ok(()) } - -async fn start_nats(port: u16, nats_install_dir: &PathBuf) -> Result { - let nats_binary = ensure_nats_server("v2.8.4", nats_install_dir).await?; - let config = NatsConfig::new_standalone("127.0.0.1", port, None); - start_nats_server(nats_binary, std::process::Stdio::null(), config).await -}