diff --git a/.github/workflows/tests_and_checks.yml b/.github/workflows/tests_and_checks.yml index fa7e9723..e0d78fcb 100644 --- a/.github/workflows/tests_and_checks.yml +++ b/.github/workflows/tests_and_checks.yml @@ -46,7 +46,7 @@ jobs: run-checks: needs: changes - if: ${{ needs.changes.outputs.rust == 'true' || needs.changes.outputs.examples == 'true' }} + if: ${{ needs.changes.outputs.rust == 'true' }} env: SCCACHE_GHA_ENABLED: "true" RUSTC_WRAPPER: "sccache" @@ -210,7 +210,7 @@ jobs: needs: changes if: ${{ needs.changes.outputs.rust == 'true' }} env: - RUSTFLAGS: -Dwarnings -Ctarget-feature=+crt-static + RUSTFLAGS: -Dwarnings -Ctarget-feature=+crt-static strategy: fail-fast: false matrix: @@ -262,6 +262,9 @@ jobs: run-cargo-tests: needs: changes if: ${{ needs.changes.outputs.rust == 'true' }} + env: + SCCACHE_GHA_ENABLED: "true" + RUSTC_WRAPPER: "sccache" runs-on: ubuntu-latest steps: - name: Setup IPFS @@ -283,11 +286,14 @@ jobs: shared-key: test-all-stable-ubuntu-latest save-if: ${{ github.event_name == 'push' }} + - name: Sccache + uses: mozilla-actions/sccache-action@v0.0.3 + - name: Run Tests (all-features) run: cargo test --workspace --all-features run-docs: - needs: changes + needs: [changes] if: ${{ needs.changes.outputs.rust == 'true' }} env: SCCACHE_GHA_ENABLED: "true" @@ -306,6 +312,7 @@ jobs: - name: Cache Project uses: Swatinem/rust-cache@v2 with: + cache-on-failure: true shared-key: doc save-if: ${{ github.event_name == 'push' }} @@ -316,3 +323,42 @@ jobs: env: RUSTDOCFLAGS: -Dwarnings run: cargo doc --workspace --document-private-items + + build-and-run-examples: + needs: changes + if: ${{ needs.changes.outputs.examples == 'true' }} + env: + SCCACHE_GHA_ENABLED: "true" + RUSTC_WRAPPER: "sccache" + runs-on: ubuntu-latest + steps: + - name: Setup IPFS + uses: ibnesayeed/setup-ipfs@master + with: + run_daemon: false + + - name: Checkout Repository + uses: actions/checkout@v4 + + - name: Use mold-linker + uses: rui314/setup-mold@v1 + + - name: Install Rust Toolchain + uses: dtolnay/rust-toolchain@nightly + + - name: Cache Project + uses: Swatinem/rust-cache@v2 + with: + cache-on-failure: true + shared-key: cargo-examples + save-if: ${{ github.event_name == 'push' }} + + - name: Sccache + uses: mozilla-actions/sccache-action@v0.0.3 + + - name: Build example-websocket-relay + run: cargo build -p websocket-relay + + - name: Run example-websocket-relay + shell: bash + run: timeout 10s cargo run -p websocket-relay || true diff --git a/Cargo.lock b/Cargo.lock index b40c7dff..241e49ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -780,6 +780,20 @@ dependencies = [ "serde_json", ] +[[package]] +name = "cargo_metadata" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "cast" version = "0.3.0" @@ -2548,6 +2562,7 @@ dependencies = [ "flume 0.11.0", "fnv", "futures", + "hickory-resolver", "homestar-invocation", "homestar-runtime-tests-proc-macro", "homestar-wasm", @@ -2615,6 +2630,7 @@ dependencies = [ "typetag", "url", "uuid", + "vergen", "wait-timeout", "winapi", ] @@ -2721,6 +2737,7 @@ dependencies = [ "libsecp256k1-core", "libsqlite3-sys", "linux-raw-sys", + "memchr", "miette", "miniz_oxide", "multibase", @@ -2735,7 +2752,9 @@ dependencies = [ "ring 0.17.7", "rustc-hash", "rustix", + "rustls 0.21.10", "scopeguard", + "semver", "serde", "serde_json", "sha2 0.10.8", @@ -3289,7 +3308,7 @@ dependencies = [ "tokio-util", "tracing", "url", - "webpki-roots", + "webpki-roots 0.26.0", ] [[package]] @@ -3545,6 +3564,7 @@ dependencies = [ "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", + "libp2p-websocket", "libp2p-yamux", "multiaddr 0.18.1", "pin-project", @@ -3690,6 +3710,7 @@ dependencies = [ "multihash 0.19.1", "quick-protobuf", "rand", + "ring 0.17.7", "serde", "sha2 0.10.8", "thiserror", @@ -3949,6 +3970,26 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-websocket" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4846d51afd08180e164291c3754ba30dd4fbac6fac65571be56403c16431a5e" +dependencies = [ + "either", + "futures", + "futures-rustls", + "libp2p-core", + "libp2p-identity", + "parking_lot", + "pin-project-lite", + "rw-stream-sink", + "soketto", + "tracing", + "url", + "webpki-roots 0.25.3", +] + [[package]] name = "libp2p-yamux" version = "0.45.1" @@ -4658,6 +4699,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -6184,7 +6234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" dependencies = [ "bytecount", - "cargo_metadata", + "cargo_metadata 0.14.2", "error-chain", "glob", "pulldown-cmark", @@ -6687,6 +6737,8 @@ checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" dependencies = [ "deranged", "itoa", + "libc", + "num_threads", "powerfmt", "serde", "time-core", @@ -7380,6 +7432,20 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bda7c41ca331fe9a1c278a9e7ee055f4be7f5eb1c2b72f079b4ff8b5fce9d5c" +[[package]] +name = "vergen" +version = "8.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27d6bdd219887a9eadd19e1c34f32e47fa332301184935c6d9bca26f3cca525" +dependencies = [ + "anyhow", + "cargo_metadata 0.18.1", + "cfg-if", + "regex", + "rustversion", + "time", +] + [[package]] name = "version_check" version = "0.9.4" @@ -8049,6 +8115,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" + [[package]] name = "webpki-roots" version = "0.26.0" diff --git a/examples/websocket-relay/README.md b/examples/websocket-relay/README.md index ef5c690e..c52c0a64 100644 --- a/examples/websocket-relay/README.md +++ b/examples/websocket-relay/README.md @@ -35,7 +35,7 @@ To get started, please install: ## Usage -1. Run `cargo run -- start` to start the runtime and an IPFS daemon as a +1. Run `cargo run` to start the runtime and an IPFS daemon as a background process. This runtime includes ANSI-coded logging by default. 2. In a separate terminal window, run `npm install --prefix relay-app` to @@ -61,38 +61,45 @@ if they've been previously run. ## Tips & Common Issues -On macOS, for example, a simple homebrew install would install everything you -need: `brew install rust npm ipfs` +- On macOS, for example, a simple homebrew install would install everything you + need: `brew install rust npm ipfs`. -We have packaged homestar binaries using brew, so -`brew install fission-codes/fission/homestar` will install everything you need, -including `ipfs`. You will still need `npm` to run this example. From this folder, -you can then run the example like this: +- Running `homestar` using `cargo run` requires a minimum Rust version of + `1.73.0`. If you've got an older version of rust, update it with + `rustup update`. -``` -homestar start --db homestar.db -``` +- You do not have to start Kubo (IPFS) on your own. The example will do this + for you, and use `examples/websocket-relay/tmp/.ipfs` as a local blockstore. + Feel free to discard it when you don't need it. -Running `homestar` via `cargo run` requires a minimum Rust version of -`1.73.0`. If you've got an older install of rust, update it with -`rustup update`. +- If you're already running an IPFS instance, for example [IPFS Desktop][ipfs-desktop], + the application will check for it and not start a new, local one. + However, the application expects a default IPFS host and port. The expected + IPFS `host` and `port` can be updated in the `homestar` network settings: -You do not have to start Kubo (IPFS) on your own. The example will do this -for you, and use `examples/websocket-relay/tmp/.ipfs` as a local blockstore. -Feel free to discard it when you don't need it. + ``` toml + [node] -If you're already running an IPFS instance, for example [IPFS Desktop][ipfs-desktop], -the application will check for it and not start a new, local one. -However, the application expects a default IPFS host and port. The expected -IPFS `host` and `port` can be updated in the `homestar` network settings: + [node.network.ipfs] + host = "127.0.0.1" + port = 5001 + ``` -``` toml -[node] +- We have officially packaged homestar binaries using brew, so + `brew install fission-codes/fission/homestar` will install mostly everything + you need, including `ipfs`. You will still need `npm` to run this example, and + you'll have to manually `ipfs add` the `synthcat.png` and `example_test.wasm` + files located in this directory. Then, from this folder, you can run the + example like this: + + ``` + homestar start --db homestar.db + ``` + + Afterward, run `npm install --prefix relay-app` to install dependencies + and `npm run --prefix relay-app dev` to start the relay web + application (UI) on `http://localhost:5173/` by default. -[node.network.ipfs] -host = "127.0.0.1" -port = 5001 -``` [@fission-codes/homestar]: https://www.npmjs.com/package/@fission-codes/homestar [install-ipfs]: https://docs.ipfs.tech/install/ diff --git a/examples/websocket-relay/src/main.rs b/examples/websocket-relay/src/main.rs index da16636d..37f9600a 100644 --- a/examples/websocket-relay/src/main.rs +++ b/examples/websocket-relay/src/main.rs @@ -2,7 +2,9 @@ use homestar_runtime::{db::Database, Db, Logger, Runner, Settings}; use miette::Result; use retry::{delay::Fixed, retry}; use std::{ + fs, net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, + path::PathBuf, process::{Child, Command, Stdio}, }; use sysinfo::{System, SystemExt}; @@ -37,6 +39,7 @@ fn main() -> Result<()> { } } + #[cfg(not(feature = "ci"))] Ok(()) } @@ -47,6 +50,7 @@ fn ipfs_setup() -> Option { println!("`ipfs` was found!"); None } else { + let _ = fs::create_dir("./tmp"); let mut ipfs_daemon = Command::new("ipfs") .args(["--repo-dir", "./tmp/.ipfs", "--offline", "daemon", "--init"]) .stderr(Stdio::piped()) @@ -77,7 +81,16 @@ fn ipfs_setup() -> Option { let mut add_image_args = args.clone(); let mut add_wasm_args = args.clone(); - add_image_args.append(&mut vec!["add", "--cid-version", "1", "./synthcat.png"]); + let mut image_file = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + image_file.push("synthcat.png"); + let image_file = fs::canonicalize(image_file).expect("synthcat.png to be found"); + + add_image_args.append(&mut vec![ + "add", + "--cid-version", + "1", + image_file.to_str().unwrap(), + ]); let ipfs_add_img = Command::new("ipfs") .args(add_image_args) @@ -87,11 +100,15 @@ fn ipfs_setup() -> Option { println!("synthcat.png added to local IPFS instance"); + let mut wasm_file = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + wasm_file.push("example_test.wasm"); + let wasm_file = fs::canonicalize(wasm_file).expect("example_test.wasm to be found"); + add_wasm_args.append(&mut vec![ "add", "--cid-version", "1", - "./example_test.wasm", + wasm_file.to_str().unwrap(), ]); let ipfs_add_wasm = Command::new("ipfs") diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index abc672b9..057c2d0a 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -67,6 +67,7 @@ faststr = { workspace = true } flume = { version = "0.11", default-features = false, features = ["async"] } fnv = { version = "1.0", default-features = false } futures = { workspace = true } +hickory-resolver = { version = "0.24", default-features = false } homestar-invocation = { version = "0.1", path = "../homestar-invocation", features = [ "diesel", ] } @@ -88,6 +89,7 @@ jsonrpsee = { version = "0.21", default-features = false, features = [ ] } libipld = { workspace = true } libp2p = { version = "0.53", default-features = false, features = [ + "dns", "kad", "request-response", "rendezvous", @@ -97,12 +99,15 @@ libp2p = { version = "0.53", default-features = false, features = [ "mdns", "gossipsub", "request-response", + "rsa", "tokio", "tcp", "noise", "cbor", "yamux", "serde", + "quic", + "websocket", ] } libsqlite3-sys = { workspace = true } maplit = "1.0" @@ -185,6 +190,14 @@ uuid = { version = "1.6.1", features = ["v4"] } [target.'cfg(not(windows))'.dependencies] daemonize = "0.5" +[build-dependencies] +vergen = { version = "8.3", default-features = false, features = [ + "build", + "cargo", + "git", + "gitcl", +] } + [dev-dependencies] assert_cmd = "2.0" criterion = "0.5" diff --git a/homestar-runtime/build.rs b/homestar-runtime/build.rs index 3a8149ef..5b672ed4 100644 --- a/homestar-runtime/build.rs +++ b/homestar-runtime/build.rs @@ -1,3 +1,14 @@ -fn main() { +fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=migrations"); + + vergen::EmitBuilder::builder() + .fail_on_error() + .use_local_build() + .git_sha(true) + .cargo_features() + .emit()?; + + println!("cargo:rerun-if-changed=build.rs"); + + Ok(()) } diff --git a/homestar-runtime/config/defaults.toml b/homestar-runtime/config/defaults.toml index f2d65c53..dee1e282 100644 --- a/homestar-runtime/config/defaults.toml +++ b/homestar-runtime/config/defaults.toml @@ -27,6 +27,10 @@ transport_connection_timeout = 60 max_connected_peers = 32 max_announce_addresses = 10 dial_interval = 30 +bootstrap_interval = 30 + +[node.network.libp2p.quic] +enable = true [node.network.libp2p.mdns] enable = true diff --git a/homestar-runtime/src/cli.rs b/homestar-runtime/src/cli.rs index 30353fa9..a42f98b0 100644 --- a/homestar-runtime/src/cli.rs +++ b/homestar-runtime/src/cli.rs @@ -17,7 +17,7 @@ use tarpc::context; mod error; pub use error::Error; pub(crate) mod show; -pub(crate) use show::ConsoleTable; +pub use show::ConsoleTable; const DEFAULT_DB_PATH: &str = "homestar.db"; const TMP_DIR: &str = "/tmp"; @@ -134,17 +134,25 @@ pub enum Command { /// Supported: /// - JSON (.json). #[arg( - short='w', - long = "workflow", value_hint = clap::ValueHint::FilePath, value_name = "FILE", value_parser = clap::value_parser!(file::ReadWorkflow), + index = 1, + required = true, help = r#"IPVM-configured workflow file to run. Supported: - JSON (.json)"# )] workflow: file::ReadWorkflow, }, + /// Get node identity / information. + Node { + /// RPC host / port arguments. + #[clap(flatten)] + args: RpcArgs, + }, + /// Get Homestar binary and other information. + Info, } impl Command { @@ -154,6 +162,8 @@ impl Command { Command::Stop { .. } => "stop", Command::Ping { .. } => "ping", Command::Run { .. } => "run", + Command::Node { .. } => "node", + Command::Info => "info", } } @@ -195,6 +205,16 @@ impl Command { response.echo_table()?; Ok(()) } + Command::Node { args } => { + let response = rt.block_on(async { + let client = args.client().await?; + let response = client.node_info().await??; + Ok::(response) + })?; + + response.echo_table()?; + Ok(()) + } _ => Err(anyhow!("Invalid command {}", self.name()).into()), } } diff --git a/homestar-runtime/src/cli/show.rs b/homestar-runtime/src/cli/show.rs index d38bdd2d..51bc1dea 100644 --- a/homestar-runtime/src/cli/show.rs +++ b/homestar-runtime/src/cli/show.rs @@ -18,7 +18,7 @@ pub(crate) const TABLE_TITLE: &str = "homestar(╯°□°)╯"; /// Output response wrapper. #[derive(Debug, Clone, PartialEq)] -pub(crate) struct Output(String); +pub struct Output(String); impl fmt::Display for Output { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -47,14 +47,17 @@ impl Output { } /// Trait for console table output responses. -pub(crate) trait ConsoleTable { +pub trait ConsoleTable { + /// Get the table as an output response. fn table(&self) -> Output; + /// Print the table to console. fn echo_table(&self) -> Result<(), io::Error>; } /// Style trait for console table output responses. pub(crate) trait ApplyStyle { fn default(&mut self) -> Output; + fn default_with_title(&mut self, ext_title: &str) -> Output; } impl ApplyStyle for Table { @@ -69,4 +72,16 @@ impl ApplyStyle for Table { Output(table) } + + fn default_with_title(&mut self, ext_title: &str) -> Output { + let table = self + .with(Style::modern()) + .with(Panel::header(format!("{TABLE_TITLE} - {ext_title}"))) + .with(Modify::new(Rows::first()).with(Alignment::left())) + .with(BorderColor::filled(Color::FG_WHITE)) + .with(BorderSpanCorrection) + .to_string(); + + Output(table) + } } diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index facc796a..997538e6 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -34,6 +34,17 @@ pub(crate) use event::Event; type P2PSender = channel::AsyncChannelSender; +struct Quorum { + /// Minimum number of peers required to receive a receipt. + receipt: usize, + /// Minimum number of peers required to receive workflow information. + workflow: usize, +} + +struct Bootstrap { + interval: Duration, +} + /// Handler trait for [EventHandler] events. #[async_trait] pub(crate) trait Handler @@ -52,10 +63,7 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "websocket-notify")))] #[allow(missing_debug_implementations, dead_code)] pub(crate) struct EventHandler { - /// Minimum number of peers required to receive a receipt. - receipt_quorum: usize, - /// Minimum number of peers required to receive workflow information. - workflow_quorum: usize, + quorum: Quorum, /// Timeout for p2p workflow info record requests. p2p_workflow_info_timeout: Duration, /// Timeout for p2p workflow info record requests from a provider. @@ -94,16 +102,15 @@ pub(crate) struct EventHandler { external_address_limit: u32, /// Interval for polling the cache for expired entries. poll_cache_interval: Duration, + /// Bootstrap configuration. + bootstrap: Bootstrap, } /// Event loop handler for libp2p network events and commands. #[cfg(not(feature = "websocket-notify"))] #[allow(missing_debug_implementations, dead_code)] pub(crate) struct EventHandler { - /// Minimum number of peers required to receive a receipt. - receipt_quorum: usize, - /// Minimum number of peers required to receive workflow information. - workflow_quorum: usize, + quorum: Quorum, /// Timeout for p2p workflow info record requests. p2p_workflow_info_timeout: Duration, /// Timeout for p2p workflow info record requests from a provider. @@ -136,6 +143,8 @@ pub(crate) struct EventHandler { external_address_limit: u32, /// Interval for polling the cache for expired entries. poll_cache_interval: Duration, + /// Bootstrap configuration. + bootstrap: Bootstrap, } /// Rendezvous protocol configurations and state @@ -179,8 +188,10 @@ where let (sender, receiver) = Self::setup_channel(settings); let sender = Arc::new(sender); Self { - receipt_quorum: settings.libp2p.dht.receipt_quorum, - workflow_quorum: settings.libp2p.dht.workflow_quorum, + quorum: Quorum { + receipt: settings.libp2p.dht.receipt_quorum, + workflow: settings.libp2p.dht.workflow_quorum, + }, p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout, p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout, db, @@ -208,6 +219,9 @@ where announce_addresses: settings.libp2p.announce_addresses.clone(), external_address_limit: settings.libp2p.max_announce_addresses, poll_cache_interval: settings.poll_cache_interval, + bootstrap: Bootstrap { + interval: settings.libp2p.bootstrap_interval, + }, } } @@ -221,8 +235,10 @@ where let (sender, receiver) = Self::setup_channel(settings); let sender = Arc::new(sender); Self { - receipt_quorum: settings.libp2p.dht.receipt_quorum, - workflow_quorum: settings.libp2p.dht.workflow_quorum, + quorum: Quorum { + receipt: settings.libp2p.dht.receipt_quorum, + workflow: settings.libp2p.dht.workflow_quorum, + }, p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout, p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout, db, @@ -248,6 +264,9 @@ where announce_addresses: settings.libp2p.announce_addresses.clone(), external_address_limit: settings.libp2p.max_announce_addresses, poll_cache_interval: settings.poll_cache_interval, + bootstrap: Bootstrap { + interval: settings.libp2p.bootstrap_interval, + }, } } diff --git a/homestar-runtime/src/event_handler/cache.rs b/homestar-runtime/src/event_handler/cache.rs index 10e5b89a..2ad0ec6e 100644 --- a/homestar-runtime/src/event_handler/cache.rs +++ b/homestar-runtime/src/event_handler/cache.rs @@ -3,8 +3,11 @@ use crate::{channel, event_handler::Event}; use libp2p::PeerId; use moka::{ - future::Cache, - notification::RemovalCause::{self, Expired}, + future::{Cache, FutureExt}, + notification::{ + ListenerFuture, + RemovalCause::{self, Expired}, + }, Expiry as ExpiryBase, }; use std::{ @@ -49,6 +52,7 @@ pub(crate) enum CacheData { /// Events to be dispatched on cache expiration. #[derive(Clone, Debug)] pub(crate) enum DispatchEvent { + Bootstrap, RegisterPeer, DiscoverPeers, DialPeer, @@ -58,38 +62,51 @@ pub(crate) enum DispatchEvent { pub(crate) fn setup_cache( sender: Arc>, ) -> Cache { - let eviction_listener = move |_key: Arc, val: CacheValue, cause: RemovalCause| { + let eviction_listener = move |_key: Arc, + val: CacheValue, + cause: RemovalCause| + -> ListenerFuture { let tx = Arc::clone(&sender); - if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") { - if cause != Expired { - return; - } - - match event { - DispatchEvent::RegisterPeer => { - if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") - { - let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned())); - }; - } - DispatchEvent::DiscoverPeers => { - if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node") - { - let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned())); - }; - } - DispatchEvent::DialPeer => { - if let Some(CacheData::Peer(node)) = val.data.get("node") { - let _ = tx.send(Event::DialPeer(node.to_owned())); - }; + async move { + if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") { + if cause == Expired { + match event { + DispatchEvent::Bootstrap => { + let _ = tx.send_async(Event::Bootstrap).await; + } + DispatchEvent::RegisterPeer => { + if let Some(CacheData::Peer(rendezvous_node)) = + val.data.get("rendezvous_node") + { + let _ = tx + .send_async(Event::RegisterPeer(rendezvous_node.to_owned())) + .await; + }; + } + DispatchEvent::DiscoverPeers => { + if let Some(CacheData::Peer(rendezvous_node)) = + val.data.get("rendezvous_node") + { + let _ = tx + .send_async(Event::DiscoverPeers(rendezvous_node.to_owned())) + .await; + }; + } + DispatchEvent::DialPeer => { + if let Some(CacheData::Peer(node)) = val.data.get("node") { + let _ = tx.send(Event::DialPeer(node.to_owned())); + }; + } + } } } } + .boxed() }; Cache::builder() .expire_after(Expiry) - .eviction_listener(eviction_listener) + .async_eviction_listener(eviction_listener) .build() } diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index ce095e7e..c54d6c92 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -134,6 +134,8 @@ pub(crate) enum Event { GetNodeInfo(AsyncChannelSender), /// Dial a peer. DialPeer(PeerId), + /// Bootstrap the node to join the DHT. + Bootstrap, } #[allow(unreachable_patterns)] @@ -301,6 +303,38 @@ impl Event { .dial(peer_id) .map_err(anyhow::Error::new)?; } + Event::Bootstrap => { + // Bootstrapping requires at least one node of the DHT to be + // known. + // + // See `libp2p::Behaviour::add_address`. + if event_handler + .swarm + .connected_peers() + .peekable() + .peek() + .is_some() + { + let _ = event_handler + .swarm + .behaviour_mut() + .kademlia + .bootstrap() + .map(|_| { + debug!( + subject = "libp2p.kad.bootstrap", + category = "handle_event", + "bootstrapped kademlia" + ) + }) + .map_err(|err| { + warn!(subject = "libp2p.kad.bootstrap.err", + category = "handle_event", + err=?err, + "error bootstrapping kademlia") + }); + } + } _ => {} } Ok(()) @@ -391,14 +425,14 @@ impl Captured { } } - let receipt_quorum = if event_handler.receipt_quorum > 0 { - unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) } + let receipt_quorum = if event_handler.quorum.receipt > 0 { + unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) } } else { Quorum::One }; - let workflow_quorum = if event_handler.workflow_quorum > 0 { - unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) } + let workflow_quorum = if event_handler.quorum.workflow > 0 { + unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) } } else { Quorum::One }; diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 5fb0162c..c5cc0982 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -523,7 +523,7 @@ async fn handle_swarm_event( QueryResult::Bootstrap(Ok(BootstrapOk { peer, .. })) => debug!( subject = "libp2p.kad.bootstrap", category = "handle_swarm_event", - "successfully bootstrapped peer: {peer}" + "successfully bootstrapped node: {peer}" ), QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key: _, @@ -733,7 +733,7 @@ async fn handle_swarm_event( ), btreemap! { "cid" => Ipld::String(key.cid.to_string()), - "quorum" => Ipld::Integer(event_handler.receipt_quorum as i128), + "quorum" => Ipld::Integer(event_handler.quorum.receipt as i128), }, ), CapsuleTag::Workflow => notification::emit_event( @@ -743,7 +743,7 @@ async fn handle_swarm_event( ), btreemap! { "cid" => Ipld::String(key.cid.to_string()), - "quorum" => Ipld::Integer(event_handler.workflow_quorum as i128), + "quorum" => Ipld::Integer(event_handler.quorum.workflow as i128), }, ), } @@ -775,7 +775,7 @@ async fn handle_swarm_event( ), btreemap! { "cid" => Ipld::String(key.cid.to_string()), - "quorum" => Ipld::Integer(event_handler.receipt_quorum as i128), + "quorum" => Ipld::Integer(event_handler.quorum.receipt as i128), "connectedPeers" => Ipld::Integer(event_handler.connections.peers.len() as i128), "storedToPeers" => Ipld::List(success.iter().map(|cid| Ipld::String(cid.to_string())).collect()) }, @@ -787,7 +787,7 @@ async fn handle_swarm_event( ), btreemap! { "cid" => Ipld::String(key.cid.to_string()), - "quorum" => Ipld::Integer(event_handler.workflow_quorum as i128), + "quorum" => Ipld::Integer(event_handler.quorum.workflow as i128), "connectedPeers" => Ipld::Integer(event_handler.connections.peers.len() as i128), "storedToPeers" => Ipld::List(success.iter().map(|cid| Ipld::String(cid.to_string())).collect()) }, @@ -1089,6 +1089,53 @@ async fn handle_swarm_event( "address" => Ipld::String(address.to_string()) }, ); + + // Init bootstrapping of the DHT + // + // Bootstrapping requires at least one node of the DHT to be + // known. + // + // See `libp2p::Behaviour::add_address`. + if event_handler + .swarm + .connected_peers() + .peekable() + .peek() + .is_some() + { + let _ = event_handler + .swarm + .behaviour_mut() + .kademlia + .bootstrap() + .map(|_| { + debug!( + subject = "libp2p.kad.bootstrap", + category = "handle_swarm_event", + "bootstrapped kademlia" + ) + }) + .map_err(|err| { + warn!(subject = "libp2p.kad.bootstrap.err", + category = "handle_swarm_event", + err=?err, + "error bootstrapping kademlia") + }); + } + + event_handler + .cache + .insert( + "bootstrap".to_string(), + CacheValue::new( + event_handler.bootstrap.interval, + HashMap::from([( + "on_expiration".to_string(), + CacheData::OnExpiration(cache::DispatchEvent::Bootstrap), + )]), + ), + ) + .await; } SwarmEvent::IncomingConnection { .. } => {} SwarmEvent::ConnectionEstablished { diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index 9639b3aa..cd14ad6c 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -1,11 +1,12 @@ use clap::Parser; use homestar_runtime::{ - cli::{Cli, Command}, + cli::{Cli, Command, ConsoleTable}, daemon, db::Database, + runner::response, Db, FileLogger, Logger, Runner, Settings, }; -use miette::Result; +use miette::{miette, Result}; use tracing::info; fn main() -> Result<()> { @@ -55,6 +56,12 @@ fn main() -> Result<()> { info!("starting Homestar runtime..."); Runner::start(settings, db).expect("Failed to start runtime") } + Command::Info => { + let response = response::Info::default(); + response + .echo_table() + .map_err(|_| miette!("failed to extract binary information"))? + } cmd => cmd.handle_rpc_command()?, } Ok(()) diff --git a/homestar-runtime/src/network/rpc.rs b/homestar-runtime/src/network/rpc.rs index 90890762..7e455119 100644 --- a/homestar-runtime/src/network/rpc.rs +++ b/homestar-runtime/src/network/rpc.rs @@ -47,6 +47,12 @@ pub(crate) enum ServerMessage { /// /// [Workflow]: homestar_workflow::Workflow RunErr(runner::Error), + /// Message sent to the [Runner] to identify the node. + /// + /// [Runner]: crate::Runner + NodeInfo, + /// Acknowledgement of the node's identity/info. + NodeInfoAck(response::AckNodeInfo), /// For skipping server messages. Skip, } @@ -63,6 +69,8 @@ pub(crate) trait Interface { async fn ping() -> String; /// Stop the server. async fn stop() -> Result<(), Error>; + /// Identify the node. + async fn node_info() -> Result; } /// RPC server state information. @@ -155,6 +163,33 @@ impl Interface for ServerHandler { .await .map_err(|e| Error::FailureToSendOnChannel(e.to_string())) } + async fn node_info(self, _: context::Context) -> Result { + let (tx, rx) = AsyncChannel::oneshot(); + self.runner_sender + .send_async((ServerMessage::NodeInfo, Some(tx))) + .await + .map_err(|e| Error::FailureToSendOnChannel(e.to_string()))?; + + let now = time::Instant::now(); + select! { + Ok(msg) = rx.recv_async() => { + match msg { + ServerMessage::NodeInfoAck(response) => { + println!("response: {:?}", response); + Ok(response) + } + _ => Err(Error::FailureToSendOnChannel("unexpected message".into())), + } + }, + _ = time::sleep_until(now + self.timeout) => { + let s = format!("server timeout of {} ms reached", self.timeout.as_millis()); + info!(subject = "rpc.timeout", + category = "rpc", + "{s}"); + Err(Error::FailureToReceiveOnChannel(s)) + } + } + } } impl Server { @@ -257,6 +292,11 @@ impl Client { self.cli.stop(self.ctx).await } + /// Identify the node. + pub async fn node_info(&self) -> Result, RpcError> { + self.cli.node_info(self.ctx).await + } + /// Run a [Workflow]. /// /// [Workflow]: homestar_workflow::Workflow diff --git a/homestar-runtime/src/network/swarm.rs b/homestar-runtime/src/network/swarm.rs index 195038e1..a84f819b 100644 --- a/homestar-runtime/src/network/swarm.rs +++ b/homestar-runtime/src/network/swarm.rs @@ -11,20 +11,27 @@ use anyhow::{Context, Result}; use const_format::formatcp; use enum_assoc::Assoc; use faststr::FastStr; +use futures::future::Either; use libp2p::{ - core::upgrade, + core::{ + muxing::StreamMuxerBox, + transport::{self, OptionalTransport}, + upgrade, + }, + dns, gossipsub::{self, MessageId, TopicHash}, identify, + identity::Keypair, kad::{ self, store::{MemoryStore, MemoryStoreConfig}, }, mdns, multiaddr::Protocol, - noise, rendezvous, + noise, quic, rendezvous, request_response::{self, ProtocolSupport}, swarm::{self, behaviour::toggle::Toggle, NetworkBehaviour, Swarm}, - tcp, yamux, PeerId, StreamProtocol, Transport, + yamux, PeerId, StreamProtocol, Transport, }; use serde::{Deserialize, Serialize}; use std::fmt; @@ -50,12 +57,7 @@ pub(crate) async fn new(settings: &settings::Network) -> Result for ComposedEvent { ComposedEvent::Identify(event) } } + +fn build_transport( + settings: &settings::Network, + keypair: Keypair, +) -> Result> { + let build_tcp = || libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new().nodelay(true)); + let build_ws_or_tcp = libp2p::websocket::WsConfig::new(build_tcp()).or_transport(build_tcp()); + let build_quic = if settings.libp2p.quic.enable { + OptionalTransport::some(quic::tokio::Transport::new(quic::Config::new(&keypair))) + } else { + OptionalTransport::none() + }; + + let transport = build_ws_or_tcp + .upgrade(upgrade::Version::V1Lazy) + .authenticate(noise::Config::new(&keypair)?) + .multiplex(yamux::Config::default()) + .timeout(settings.libp2p.transport_connection_timeout) + .or_transport(build_quic) + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }); + + let transport = if let Ok((conf, opts)) = hickory_resolver::system_conf::read_system_conf() { + info!( + subject = "swarm.init", + category = "libp2p.swarm", + "using system DNS configuration from /etc/resolv.conf" + ); + dns::tokio::Transport::custom(transport, conf, opts) + } else { + info!( + subject = "swarm.init", + category = "libp2p.swarm", + "using cloudflare DNS configuration as a fallback" + ); + dns::tokio::Transport::custom( + transport, + dns::ResolverConfig::cloudflare(), + dns::ResolverOpts::default(), + ) + }; + + Ok(transport.boxed()) +} diff --git a/homestar-runtime/src/network/webserver/rpc.rs b/homestar-runtime/src/network/webserver/rpc.rs index c0d646d3..9e14343f 100644 --- a/homestar-runtime/src/network/webserver/rpc.rs +++ b/homestar-runtime/src/network/webserver/rpc.rs @@ -182,7 +182,6 @@ where } })?; - #[cfg(not(test))] module.register_async_method(NODE_INFO_ENDPOINT, |_, ctx| async move { let (tx, rx) = crate::channel::AsyncChannel::oneshot(); ctx.runner_sender diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index 5e0a55e2..00471455 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -45,7 +45,7 @@ use tracing::{debug, error, info, warn}; mod error; pub(crate) mod file; mod nodeinfo; -pub(crate) mod response; +pub mod response; pub(crate) use error::Error; pub(crate) use nodeinfo::{DynamicNodeInfo, StaticNodeInfo}; @@ -268,10 +268,16 @@ impl Runner { match handle { Ok(ControlFlow::Break(())) => break now.elapsed(), Ok(ControlFlow::Continue(rpc::ServerMessage::Skip)) => {}, + Ok(ControlFlow::Continue(msg @ rpc::ServerMessage::NodeInfoAck(_))) => { + debug!(subject = "rpc.ack", + category = "rpc", + "sending node_info message to rpc server"); + let _ = oneshot_tx.send_async(msg).await; + }, Ok(ControlFlow::Continue(msg @ rpc::ServerMessage::RunAck(_))) => { debug!(subject = "rpc.ack", category = "rpc", - "sending message to rpc server"); + "sending workflow_run message to rpc server"); let _ = oneshot_tx.send_async(msg).await; }, Err(err) => { @@ -609,6 +615,26 @@ impl Runner { now: time::Instant, ) -> Result> { match msg { + rpc::ServerMessage::NodeInfo => { + info!( + subject = "rpc.command", + category = "rpc", + "RPC node command received, sending node info" + ); + + let (tx, rx) = AsyncChannel::oneshot(); + let _ = self.event_sender.send_async(Event::GetNodeInfo(tx)).await; + + let dyn_node_info = if let Ok(info) = rx.recv_async().await { + info + } else { + DynamicNodeInfo::default() + }; + + Ok(ControlFlow::Continue(rpc::ServerMessage::NodeInfoAck( + response::AckNodeInfo::new(self.node_info.clone(), dyn_node_info), + ))) + } rpc::ServerMessage::ShutdownCmd => { info!( subject = "rpc.command", @@ -631,6 +657,11 @@ impl Runner { } } rpc::ServerMessage::Run((name, workflow_file)) => { + info!( + subject = "rpc.command", + category = "rpc", + "RPC run command received, running workflow" + ); let (workflow, workflow_settings) = workflow_file.validate_and_parse().await.with_context(|| { format!("failed to validate/parse workflow @ path: {workflow_file}",) diff --git a/homestar-runtime/src/runner/nodeinfo.rs b/homestar-runtime/src/runner/nodeinfo.rs index 82fd8aff..5b8f1804 100644 --- a/homestar-runtime/src/runner/nodeinfo.rs +++ b/homestar-runtime/src/runner/nodeinfo.rs @@ -2,15 +2,22 @@ use libp2p::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; +use tabled::Tabled; /// Static node information available at startup. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Tabled)] pub(crate) struct StaticNodeInfo { /// The [PeerId] of a node. pub(crate) peer_id: PeerId, } +impl fmt::Display for StaticNodeInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "peer_id: {}", self.peer_id) + } +} + impl StaticNodeInfo { /// Create an instance of [StaticNodeInfo]. pub(crate) fn new(peer_id: PeerId) -> Self { @@ -34,6 +41,16 @@ pub(crate) struct DynamicNodeInfo { pub(crate) connections: HashMap, } +impl fmt::Display for DynamicNodeInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "listeners: {:?}, connections: {:?}", + self.listeners, self.connections + ) + } +} + impl DynamicNodeInfo { /// Create an instance of [DynamicNodeInfo]. pub(crate) fn new(listeners: Vec, connections: HashMap) -> Self { diff --git a/homestar-runtime/src/runner/response.rs b/homestar-runtime/src/runner/response.rs index 1d326c17..d00acb85 100644 --- a/homestar-runtime/src/runner/response.rs +++ b/homestar-runtime/src/runner/response.rs @@ -18,6 +18,8 @@ use tabled::{ Table, Tabled, }; +use super::{DynamicNodeInfo, StaticNodeInfo}; + /// Workflow information specified for response / display upon /// acknowledgement of running a workflow. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Tabled)] @@ -109,14 +111,14 @@ impl show::ConsoleTable for AckWorkflow { let receipt_table = receipt_table_builder.build(); - let tbl = col![table, resource_table, receipt_table].default(); + let tbl = col![table, resource_table, receipt_table].default_with_title("run"); tbl.echo() } } /// Ping response for display. -#[derive(Tabled)] +#[derive(Debug, Tabled)] pub(crate) struct Ping { address: SocketAddr, response: String, @@ -131,7 +133,118 @@ impl Ping { impl show::ConsoleTable for Ping { fn table(&self) -> show::Output { - Table::new(vec![&self]).default() + Table::new(vec![&self]).default_with_title("ping/pong") + } + + fn echo_table(&self) -> Result<(), std::io::Error> { + self.table().echo() + } +} + +/// Node identity response for display. +#[derive(Debug, Clone, Serialize, Deserialize, Tabled)] +pub struct AckNodeInfo { + /// Static node information. + static_info: StaticNodeInfo, + /// Dynamic node information. + dyn_info: DynamicNodeInfo, +} + +impl fmt::Display for AckNodeInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "") + } +} + +impl AckNodeInfo { + /// Create a new [AckNodeInfo] response. + pub(crate) fn new(static_info: StaticNodeInfo, dyn_info: DynamicNodeInfo) -> Self { + Self { + static_info, + dyn_info, + } + } +} + +impl show::ConsoleTable for AckNodeInfo { + fn table(&self) -> show::Output { + show::Output::new(Table::new(vec![self]).to_string()) + } + + fn echo_table(&self) -> Result<(), std::io::Error> { + let static_info_table = Table::new(vec![&self.static_info]); + + let mut listeners_table = Table::new( + self.dyn_info + .listeners + .iter() + .map(|v| v.to_string()) + .collect::>(), + ); + + let conns = self + .dyn_info + .connections + .iter() + .map(|(k, v)| vec![k.to_string(), v.to_string()]) + .collect::>>(); + + let mut conns_table_builder = tabled::builder::Builder::from_iter(conns); + + // If there are no connections, add a placeholder row. + if conns_table_builder.count_rows() == 0 { + conns_table_builder.push_record([ + "Connections".to_string(), + "".to_string(), + "".to_string(), + ]); + conns_table_builder.push_record(["".to_string(), "".to_string(), "".to_string()]); + } else { + conns_table_builder.insert_record( + 0, + ["Connections".to_string(), "".to_string(), "".to_string()], + ); + } + + listeners_table.with( + Modify::new(Rows::first()).with(Format::content(|_s| "Listen Addresses".to_string())), + ); + let conns_table = conns_table_builder.build(); + + let tbl = col![static_info_table, listeners_table, conns_table].default_with_title("node"); + + tbl.echo() + } +} + +/// Info response for display. +#[derive(Debug, Tabled)] +pub struct Info { + version: String, + git_sha: String, + features: String, +} + +impl Default for Info { + fn default() -> Self { + Self::new() + } +} + +impl Info { + /// Create a new [Info] response. + pub(crate) fn new() -> Self { + Self { + version: env!("CARGO_PKG_VERSION").to_string(), + git_sha: env!("VERGEN_GIT_SHA").to_string(), + features: env!("VERGEN_CARGO_FEATURES").to_string(), + } + } +} + +impl show::ConsoleTable for Info { + fn table(&self) -> show::Output { + Table::new(vec![&self]).default_with_title("info") } fn echo_table(&self) -> Result<(), std::io::Error> { diff --git a/homestar-runtime/src/settings/libp2p_config.rs b/homestar-runtime/src/settings/libp2p_config.rs index bb913873..6487a8cc 100644 --- a/homestar-runtime/src/settings/libp2p_config.rs +++ b/homestar-runtime/src/settings/libp2p_config.rs @@ -30,6 +30,8 @@ pub(crate) struct Libp2p { /// Multiaddrs of the trusted nodes to connect to on startup. #[serde_as(as = "Vec")] pub(crate) node_addresses: Vec, + /// Quic Settings. + pub(crate) quic: Quic, /// mDNS Settings. pub(crate) mdns: Mdns, /// Pubsub Settings. @@ -42,6 +44,16 @@ pub(crate) struct Libp2p { /// Dial interval. #[serde_as(as = "DurationSeconds")] pub(crate) dial_interval: Duration, + /// Bootstrap dial interval. + #[serde_as(as = "DurationSeconds")] + pub(crate) bootstrap_interval: Duration, +} + +/// DHT settings. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub(crate) struct Quic { + /// Enable Quic transport. + pub(crate) enable: bool, } /// DHT settings. @@ -136,12 +148,14 @@ impl Default for Libp2p { listen_address: Uri::from_static("/ip4/0.0.0.0/tcp/0"), max_connected_peers: 32, max_announce_addresses: 10, + quic: Quic::default(), mdns: Mdns::default(), node_addresses: Vec::new(), pubsub: Pubsub::default(), rendezvous: Rendezvous::default(), transport_connection_timeout: Duration::new(60, 0), dial_interval: Duration::new(30, 0), + bootstrap_interval: Duration::new(30, 0), } } } @@ -170,6 +184,12 @@ impl Default for Dht { } } +impl Default for Quic { + fn default() -> Self { + Self { enable: true } + } +} + impl Default for Mdns { fn default() -> Self { Self { diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs index d7fd8c85..30d22a36 100644 --- a/homestar-runtime/tests/cli.rs +++ b/homestar-runtime/tests/cli.rs @@ -4,6 +4,7 @@ use crate::{ make_config, utils::{ wait_for_socket_connection, wait_for_socket_connection_v6, ChildGuard, ProcInfo, BIN_NAME, + ED25519MULTIHASH, }, }; use anyhow::Result; @@ -107,6 +108,8 @@ fn test_server_integration() -> Result<()> { let toml = format!( r#" [node] + [node.network.keypair_config] + existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" }} [node.network.libp2p.mdns] enable = false [node.network.metrics] @@ -155,6 +158,16 @@ fn test_server_integration() -> Result<()> { .stdout(predicate::str::contains("::1")) .stdout(predicate::str::contains("pong")); + Command::new(BIN.as_os_str()) + .arg("node") + .arg("--host") + .arg("::1") + .arg("-p") + .arg(rpc_port.to_string()) + .assert() + .success() + .stdout(predicate::str::contains(ED25519MULTIHASH.to_string())); + Command::new(BIN.as_os_str()) .arg("ping") .arg("--host") @@ -212,7 +225,6 @@ fn test_workflow_run_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .assert() .success() @@ -227,7 +239,6 @@ fn test_workflow_run_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .assert() .success() diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs index 05e054ca..24d09298 100644 --- a/homestar-runtime/tests/network.rs +++ b/homestar-runtime/tests/network.rs @@ -280,6 +280,7 @@ fn test_libp2p_connect_known_peers_integration() -> Result<()> { [node.network.libp2p] listen_address = "{listen_addr1}" node_addresses = ["{node_addrb}"] + bootstrap_interval = 1 [node.network.libp2p.mdns] enable = false [node.network.libp2p.rendezvous] @@ -388,6 +389,12 @@ fn test_libp2p_connect_known_peers_integration() -> Result<()> { let stdout1 = retrieve_output(dead_proc1); let stdout2 = retrieve_output(dead_proc2); + // Check that node bootsrapped itself on the 1 second delay. + let bootstrapped = check_for_line_with( + stdout1.clone(), + vec!["successfully bootstrapped node", ED25519MULTIHASH], + ); + // Check node two was added to the Kademlia table let two_added_to_dht = check_for_line_with( stdout1.clone(), @@ -412,6 +419,7 @@ fn test_libp2p_connect_known_peers_integration() -> Result<()> { vec!["peer connection established", SECP256K1MULTIHASH], ); + assert!(bootstrapped); assert!(one_connected_to_two); assert!(two_in_dht_routing_table); assert!(two_added_to_dht); @@ -571,3 +579,71 @@ fn test_libp2p_disconnect_known_peers_integration() -> Result<()> { Ok(()) } + +#[test] +#[serial_test::parallel] +fn test_libp2p_configured_with_known_dns_multiaddr() -> Result<()> { + let proc_info = ProcInfo::new().unwrap(); + let rpc_port = proc_info.rpc_port; + let metrics_port = proc_info.metrics_port; + let ws_port = proc_info.ws_port; + let listen_addr = listen_addr(proc_info.listen_port); + + let known_peer_id = "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN"; + // from ipfs bootstrap list + let dns_node_addr = format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", known_peer_id); + let toml = format!( + r#" + [node] + [node.network.keypair_config] + existing = {{ key_type = "ed25519", path = "./fixtures/__testkey_ed25519_2.pem" }} + [node.network.libp2p] + listen_address = "{listen_addr}" + node_addresses = ["{dns_node_addr}"] + [node.network.libp2p.mdns] + enable = false + [node.network.libp2p.rendezvous] + enable_client = false + enable_server = false + [node.network.metrics] + port = {metrics_port} + [node.network.rpc] + port = {rpc_port} + [node.network.webserver] + port = {ws_port} + "# + ); + + let config = make_config!(toml); + + let homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("-c") + .arg(config.filename()) + .arg("--db") + .arg(&proc_info.db_path) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + let proc_guard = ChildGuard::new(homestar_proc); + + if wait_for_socket_connection_v6(rpc_port, 1000).is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + let dead_proc = kill_homestar(proc_guard.take(), None); + let stdout = retrieve_output(dead_proc); + + let multiaddr_not_supported = + check_for_line_with(stdout.clone(), vec!["MultiaddrNotSupported"]); + + // This can connect to known dns multiaddrs, but won't over GHA. + // let connected_to_known_peer = + // check_for_line_with(stdout, vec!["peer connection established", known_peer_id]); + // assert!(connected_to_known_peer); + + // Check that we don't receive a MultiaddrNotSupported error. + assert!(!multiaddr_not_supported); + + Ok(()) +} diff --git a/homestar-runtime/tests/network/dht.rs b/homestar-runtime/tests/network/dht.rs index 22308276..17da655b 100644 --- a/homestar-runtime/tests/network/dht.rs +++ b/homestar-runtime/tests/network/dht.rs @@ -194,7 +194,6 @@ fn test_libp2p_dht_records_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port1.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one-part-one.json") .output(); @@ -251,7 +250,6 @@ fn test_libp2p_dht_records_integration() -> Result<()> { // .arg("run") // .arg("-p") // .arg(rpc_port2.to_string()) - // .arg("-w") // .arg("tests/fixtures/test-workflow-add-one-part-two.json") // .output(); @@ -280,7 +278,6 @@ fn test_libp2p_dht_records_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port2.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one-part-one.json") .output(); @@ -509,7 +506,6 @@ fn test_libp2p_dht_quorum_failure_intregration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port1.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); @@ -728,7 +724,6 @@ fn test_libp2p_dht_workflow_info_provider_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port1.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); @@ -745,7 +740,6 @@ fn test_libp2p_dht_workflow_info_provider_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port2.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); @@ -1133,7 +1127,6 @@ fn test_libp2p_dht_workflow_info_provider_recursive_integration() -> Result<()> .arg("run") .arg("-p") .arg(rpc_port1.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); @@ -1163,7 +1156,6 @@ fn test_libp2p_dht_workflow_info_provider_recursive_integration() -> Result<()> .arg("run") .arg("-p") .arg(rpc_port2.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); @@ -1211,7 +1203,6 @@ fn test_libp2p_dht_workflow_info_provider_recursive_integration() -> Result<()> .arg("run") .arg("-p") .arg(rpc_port3.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); diff --git a/homestar-runtime/tests/network/gossip.rs b/homestar-runtime/tests/network/gossip.rs index 0ee08fba..a38292ca 100644 --- a/homestar-runtime/tests/network/gossip.rs +++ b/homestar-runtime/tests/network/gossip.rs @@ -176,7 +176,6 @@ fn test_libp2p_receipt_gossip_integration() -> Result<()> { .arg("run") .arg("-p") .arg(rpc_port1.to_string()) - .arg("-w") .arg("tests/fixtures/test-workflow-add-one.json") .output(); diff --git a/homestar-workspace-hack/Cargo.toml b/homestar-workspace-hack/Cargo.toml index f1967932..fadf831a 100644 --- a/homestar-workspace-hack/Cargo.toml +++ b/homestar-workspace-hack/Cargo.toml @@ -57,6 +57,7 @@ jsonrpsee-core = { version = "0.21", features = ["async-client", "async-wasm-cli libc = { version = "0.2", features = ["extra_traits"] } libsecp256k1-core = { version = "0.3" } libsqlite3-sys = { version = "0.27", features = ["bundled"] } +memchr = { version = "2" } miette = { version = "5", features = ["fancy"] } miniz_oxide = { version = "0.7", features = ["simd"] } multibase = { version = "0.9" } @@ -68,12 +69,13 @@ regex-syntax = { version = "0.8" } retry = { version = "2" } rustc-hash = { version = "1" } scopeguard = { version = "1" } +semver = { version = "1", features = ["serde"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } -serde_json = { version = "1", features = ["alloc", "float_roundtrip", "raw_value"] } +serde_json = { version = "1", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] } sha2 = { version = "0.10" } signature = { version = "2", default-features = false, features = ["std"] } smallvec = { version = "1", default-features = false, features = ["union"] } -time = { version = "0.3", features = ["formatting", "macros", "parsing"] } +time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "test-util", "tracing"] } tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "time"] } @@ -134,6 +136,7 @@ jsonrpsee-core = { version = "0.21", features = ["async-client", "async-wasm-cli libc = { version = "0.2", features = ["extra_traits"] } libsecp256k1-core = { version = "0.3" } libsqlite3-sys = { version = "0.27", features = ["bundled"] } +memchr = { version = "2" } miette = { version = "5", features = ["fancy"] } miniz_oxide = { version = "0.7", features = ["simd"] } multibase = { version = "0.9" } @@ -146,14 +149,15 @@ regex-syntax = { version = "0.8" } retry = { version = "2" } rustc-hash = { version = "1" } scopeguard = { version = "1" } +semver = { version = "1", features = ["serde"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } -serde_json = { version = "1", features = ["alloc", "float_roundtrip", "raw_value"] } +serde_json = { version = "1", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] } sha2 = { version = "0.10" } signature = { version = "2", default-features = false, features = ["std"] } smallvec = { version = "1", default-features = false, features = ["union"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } -time = { version = "0.3", features = ["formatting", "macros", "parsing"] } +time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "rt-multi-thread", "signal", "test-util", "tracing"] } tokio-stream = { version = "0.1", features = ["net", "sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "time"] } @@ -211,6 +215,7 @@ iana-time-zone = { version = "0.1", default-features = false, features = ["fallb object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.x86_64-apple-darwin.build-dependencies] @@ -219,6 +224,7 @@ iana-time-zone = { version = "0.1", default-features = false, features = ["fallb object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.aarch64-apple-darwin.dependencies] @@ -227,6 +233,7 @@ iana-time-zone = { version = "0.1", default-features = false, features = ["fallb object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.aarch64-apple-darwin.build-dependencies] @@ -235,6 +242,7 @@ iana-time-zone = { version = "0.1", default-features = false, features = ["fallb object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.x86_64-unknown-linux-gnu.dependencies] @@ -244,6 +252,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.x86_64-unknown-linux-gnu.build-dependencies] @@ -253,6 +262,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.x86_64-unknown-linux-musl.dependencies] @@ -262,6 +272,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.x86_64-unknown-linux-musl.build-dependencies] @@ -271,6 +282,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.aarch64-unknown-linux-musl.dependencies] @@ -280,6 +292,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } [target.aarch64-unknown-linux-musl.build-dependencies] @@ -289,6 +302,7 @@ linux-raw-sys = { version = "0.4", default-features = false, features = ["elf", object = { version = "0.32", default-features = false, features = ["archive", "read_core", "unaligned", "write"] } ring = { version = "0.17", features = ["std"] } rustix = { version = "0.38", features = ["event", "mm", "net", "param", "process", "procfs", "termios", "thread", "time"] } +rustls = { version = "0.21", features = ["dangerous_configuration", "quic"] } subtle = { version = "2" } ### END HAKARI SECTION