diff --git a/.gitignore b/.gitignore index bbc8610d..535ab18e 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,17 @@ homestar.pid # locks homestar-wasm/Cargo.lock + +# examples-npm +node_modules +examples/**/**/build +examples/**/**.svelte-kit +examples/**/**/package +examples/**/**/.env.* +examples/**/**/vite.config.js.timestamp-* +examples/**/**/vite.config.ts.timestamp-* + +!examples/**/**/.env.example + +# temp +examples/websocket-relay diff --git a/Cargo.lock b/Cargo.lock index c1f7dbd0..7927ed17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2378,7 +2378,6 @@ dependencies = [ "ipfs-api", "ipfs-api-backend-hyper", "itertools 0.11.0", - "json", "libipld", "libp2p", "libsqlite3-sys", @@ -2397,6 +2396,7 @@ dependencies = [ "semver", "serde", "serde_ipld_dagcbor", + "serde_json", "serde_with", "serial_test", "stream-cancel", diff --git a/Cargo.toml b/Cargo.toml index c93d48ab..76b84b82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [workspace] members = [ + # "examples/*", "homestar-core", "homestar-functions", "homestar-runtime", - "homestar-wasm" -] + "homestar-wasm"] resolver = "2" [workspace.package] diff --git a/examples/.gitkeep b/examples/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/homestar-core/src/workflow/input.rs b/homestar-core/src/workflow/input.rs index 2e607453..2009a6e3 100644 --- a/homestar-core/src/workflow/input.rs +++ b/homestar-core/src/workflow/input.rs @@ -48,13 +48,23 @@ where } /// Return *only* deferred/awaited inputs. - pub fn deferreds(&self) -> Vec { - self.0.iter().fold(vec![], |mut acc, input| { + pub fn deferreds(&self) -> impl Iterator + '_ { + self.0.iter().filter_map(|input| { if let Input::Deferred(awaited_promise) = input { - acc.push(awaited_promise.instruction_cid()); - acc + Some(awaited_promise.instruction_cid()) } else { - acc + None + } + }) + } + + /// Return *only* [Ipld::Link] [Cid]s. + pub fn links(&self) -> impl Iterator + '_ { + self.0.iter().filter_map(|input| { + if let Input::Ipld(Ipld::Link(link)) = input { + Some(link.to_owned()) + } else { + None } }) } diff --git a/homestar-core/src/workflow/receipt/metadata.rs b/homestar-core/src/workflow/receipt/metadata.rs index 3698b87d..8794a51a 100644 --- a/homestar-core/src/workflow/receipt/metadata.rs +++ b/homestar-core/src/workflow/receipt/metadata.rs @@ -5,7 +5,17 @@ /// Metadata key for an operation or function name. pub const OP_KEY: &str = "op"; +/// Metadata attributed to a boolean true/false value on whether +/// the computation was executed from scratch or not. +pub const REPLAYED_KEY: &str = "replayed"; + /// Metadata key for a workflow [Cid]. /// /// [Cid]: libipld::Cid pub const WORKFLOW_KEY: &str = "workflow"; + +/// Associated metadata key for a workflow name, which +/// will either be some identifier, or the [Cid] of the workflow. +/// +/// [Cid]: libipld::Cid +pub const WORKFLOW_NAME_KEY: &str = "name"; diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 4f7ec18e..e9524752 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -72,6 +72,7 @@ sec1 = { version = "0.7", features = ["pem", "der"] } semver = "1.0" serde = { version = "1.0", features = ["derive"] } serde_ipld_dagcbor = { workspace = true } +serde_json = { version = "1.0", optional = true } serde_with = { version = "3.2", features = ["base64"] } stream-cancel = "0.8" strum = { version = "0.25", features = ["derive"] } @@ -98,7 +99,6 @@ assert_cmd = "2.0" criterion = "0.5" homestar-core = { version = "0.1", path = "../homestar-core", features = [ "test-utils" ] } homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "homestar-runtime-tests-proc-macro" } -json = "0.12" nix = "0.26" once_cell = "1.18" predicates = "3.0" @@ -110,11 +110,12 @@ tokio-tungstenite = "0.20" wait-timeout = "0.2" [features] -default = ["ipfs", "websocket-server"] +default = ["ipfs", "websocket-notify"] console = ["dep:console-subscriber"] ipfs = ["dep:ipfs-api", "dep:ipfs-api-backend-hyper"] profile = ["dep:puffin", "dep:puffin_egui"] test-utils = ["dep:proptest"] +websocket-notify = ["websocket-server", "dep:serde_json"] websocket-server = ["dep:axum"] [package.metadata.docs.rs] diff --git a/homestar-runtime/migrations/2023-06-04-135955_create_workflows/up.sql b/homestar-runtime/migrations/2023-06-04-135955_create_workflows/up.sql index 8d698a85..e34e21f0 100644 --- a/homestar-runtime/migrations/2023-06-04-135955_create_workflows/up.sql +++ b/homestar-runtime/migrations/2023-06-04-135955_create_workflows/up.sql @@ -1,5 +1,6 @@ CREATE TABLE workflows ( cid TEXT NOT NULL PRIMARY KEY, + name TEXT, num_tasks INTEGER NOT NULL, resources BLOB NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, diff --git a/homestar-runtime/src/cli.rs b/homestar-runtime/src/cli.rs index 2a00b12f..b44290e0 100644 --- a/homestar-runtime/src/cli.rs +++ b/homestar-runtime/src/cli.rs @@ -105,6 +105,15 @@ pub enum Command { /// RPC host / port arguments. #[clap(flatten)] args: RpcArgs, + /// (optional) name of workflow. + #[arg( + short = 'n', + long = "name", + value_name = "NAME", + help = "(optional) name given to a workflow" + )] + name: Option, + /// Workflow file to run. #[arg( short='w', long = "workflow", @@ -113,7 +122,6 @@ pub enum Command { value_parser = clap::value_parser!(file::ReadWorkflow), help = "path to workflow file" )] - /// Workflow file to run. workflow: file::ReadWorkflow, }, } @@ -129,7 +137,7 @@ impl Command { } /// Handle CLI commands related to [Client] RPC calls. - pub fn handle_rpc_command(&self) -> Result<(), Error> { + pub fn handle_rpc_command(self) -> Result<(), Error> { // Spin up a new tokio runtime on the current thread. let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -154,11 +162,12 @@ impl Command { }), Command::Run { args, + name, workflow: workflow_file, } => { let response = rt.block_on(async { let client = args.client().await?; - let response = client.run(workflow_file.to_owned()).await??; + let response = client.run(name, workflow_file).await??; Ok::(response) })?; diff --git a/homestar-runtime/src/db.rs b/homestar-runtime/src/db.rs index eaf377d9..80ebdb0c 100644 --- a/homestar-runtime/src/db.rs +++ b/homestar-runtime/src/db.rs @@ -8,6 +8,7 @@ use crate::{ use anyhow::Result; use byte_unit::{AdjustedByte, Byte, ByteUnit}; use diesel::{ + dsl::now, prelude::*, r2d2::{self, CustomizeConnection, ManageConnection}, BelongingToDsl, Connection as SingleConnection, RunQueryDsl, SqliteConnection, @@ -69,7 +70,7 @@ impl Db { /// Database trait for working with different Sqlite connection pool and /// connection configurations. pub trait Database: Send + Sync + Clone { - /// Get database url. + /// Set database url. /// /// Contains a minimal side-effect to set the env if not already set. fn set_url(database_url: Option) -> Option { @@ -82,9 +83,14 @@ pub trait Database: Send + Sync + Clone { ) } + /// Get database url. + fn url() -> Result { + Ok(env::var(ENV)?) + } + /// Test a Sqlite connection to the database and run pending migrations. fn setup(url: &str) -> Result { - info!("Using database at {:?}", url); + info!("using database at {}", url); let mut connection = SqliteConnection::establish(url)?; let _ = connection.run_pending_migrations(MIGRATIONS); @@ -92,7 +98,10 @@ pub trait Database: Send + Sync + Clone { } /// Establish a pooled connection to Sqlite database. - fn setup_connection_pool(settings: &settings::Node) -> Result + fn setup_connection_pool( + settings: &settings::Node, + database_url: Option, + ) -> Result where Self: Sized; /// Get a pooled connection for the database. @@ -226,7 +235,7 @@ pub trait Database: Send + Sync + Clone { fn get_workflow_info( workflow_cid: Cid, conn: &mut Connection, - ) -> Result { + ) -> Result<(Option, workflow::Info), diesel::result::Error> { let workflow = Self::select_workflow(workflow_cid, conn)?; let associated_receipts = workflow::StoredReceipt::belonging_to(&workflow) .select(schema::workflows_receipts::receipt_cid) @@ -237,18 +246,29 @@ pub trait Database: Send + Sync + Clone { .map(|pointer: Pointer| pointer.cid()) .collect(); - Ok(workflow::Info::new( - workflow_cid, - workflow.num_tasks as u32, - cids, - workflow.resources, - )) + let name = workflow.name.clone(); + let info = workflow::Info::new(workflow, cids); + + Ok((name, info)) + } + + /// Update the local (view) name of a workflow. + fn update_local_name(name: String, conn: &mut Connection) -> Result<(), diesel::result::Error> { + diesel::update(schema::workflows::dsl::workflows) + .filter(schema::workflows::created_at.lt(now)) + .set(schema::workflows::name.eq(&name)) + .execute(conn)?; + + Ok(()) } } impl Database for Db { - fn setup_connection_pool(settings: &settings::Node) -> Result { - let database_url = env::var(ENV).unwrap_or_else(|_| { + fn setup_connection_pool( + settings: &settings::Node, + database_url: Option, + ) -> Result { + let database_url = Self::set_url(database_url).unwrap_or_else(|| { settings .db .url @@ -305,7 +325,7 @@ mod test { fn check_pragmas_memory_db() { let settings = TestSettings::load(); - let db = MemoryDb::setup_connection_pool(settings.node()).unwrap(); + let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap(); let mut conn = db.conn().unwrap(); let journal_mode = diesel::dsl::sql::("PRAGMA journal_mode") diff --git a/homestar-runtime/src/db/schema.rs b/homestar-runtime/src/db/schema.rs index 3d00e310..be6f8150 100644 --- a/homestar-runtime/src/db/schema.rs +++ b/homestar-runtime/src/db/schema.rs @@ -16,6 +16,7 @@ diesel::table! { diesel::table! { workflows (cid) { cid -> Text, + name -> Nullable, num_tasks -> Integer, resources -> Binary, created_at -> Timestamp, @@ -33,4 +34,4 @@ diesel::table! { diesel::joinable!(workflows_receipts -> receipts (receipt_cid)); diesel::joinable!(workflows_receipts -> workflows (workflow_cid)); -diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts,); +diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts); diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 543a9356..190b73f9 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -1,5 +1,7 @@ //! [EventHandler] implementation for handling network events and messages. +#[cfg(feature = "websocket-server")] +use crate::network::ws; #[cfg(feature = "ipfs")] use crate::network::IpfsCli; use crate::{ @@ -37,6 +39,27 @@ where } /// Event loop handler for [libp2p] network events and commands. +#[cfg(feature = "websocket-server")] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "websocket-server", feature = "websocket-notify"))) +)] +#[allow(missing_debug_implementations, dead_code)] +pub(crate) struct EventHandler { + receipt_quorum: usize, + workflow_quorum: usize, + p2p_provider_timeout: Duration, + db: DB, + swarm: Swarm, + sender: Arc>, + receiver: mpsc::Receiver, + query_senders: FnvHashMap, + request_response_senders: FnvHashMap, + ws_msg_sender: ws::Notifier, +} + +/// Event loop handler for [libp2p] network events and commands. +#[cfg(not(feature = "websocket-server"))] #[allow(missing_debug_implementations, dead_code)] pub(crate) struct EventHandler { receipt_quorum: usize, @@ -59,6 +82,30 @@ where } /// Create an [EventHandler] with channel sender/receiver defaults. + #[cfg(feature = "websocket-server")] + pub(crate) fn new( + swarm: Swarm, + db: DB, + settings: &settings::Node, + ws_msg_sender: ws::Notifier, + ) -> Self { + let (sender, receiver) = Self::setup_channel(settings); + Self { + receipt_quorum: settings.network.receipt_quorum, + workflow_quorum: settings.network.workflow_quorum, + p2p_provider_timeout: settings.network.p2p_provider_timeout, + db, + swarm, + sender: Arc::new(sender), + receiver, + query_senders: FnvHashMap::default(), + request_response_senders: FnvHashMap::default(), + ws_msg_sender, + } + } + + /// Create an [EventHandler] with channel sender/receiver defaults. + #[cfg(not(feature = "websocket-server"))] pub(crate) fn new(swarm: Swarm, db: DB, settings: &settings::Node) -> Self { let (sender, receiver) = Self::setup_channel(settings); Self { @@ -85,6 +132,18 @@ where self.sender.clone() } + /// [tokio::sync::broadcast::Sender] for sending messages through the + /// webSocket server to subscribers. + #[cfg(all(feature = "websocket-server", feature = "websocket-notify"))] + #[cfg_attr( + docsrs, + doc(cfg(all(feature = "websocket-server", feature = "websocket-notify"))) + )] + #[allow(dead_code)] + pub(crate) fn ws_sender(&self) -> ws::Notifier { + self.ws_msg_sender.clone() + } + /// Start [EventHandler] that matches on swarm and pubsub [events]. /// /// [events]: libp2p::swarm::SwarmEvent diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index 2cd088a3..c395b099 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -14,6 +14,8 @@ use crate::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; +#[cfg(all(feature = "websocket-server", feature = "websocket-notify"))] +use homestar_core::ipld::DagJson; use homestar_core::workflow::Receipt as InvocationReceipt; use libipld::{Cid, Ipld}; use libp2p::{ @@ -28,7 +30,7 @@ use tracing::{error, info}; /// A [Receipt] captured (inner) event. #[derive(Debug, Clone)] -pub struct Captured { +pub(crate) struct Captured { /// The captured receipt. pub(crate) receipt: Receipt, /// The captured workflow information. @@ -40,7 +42,7 @@ pub struct Captured { /// A structured query for finding a [Record] in the DHT and /// returning to a [P2PSender]. #[derive(Debug, Clone)] -pub struct QueryRecord { +pub(crate) struct QueryRecord { /// The record identifier, which is a [Cid]. pub(crate) cid: Cid, /// The record capsule tag, which can be part of a key. @@ -52,7 +54,7 @@ pub struct QueryRecord { /// A structured query for finding a [Record] in the DHT and /// returning to a [P2PSender]. #[derive(Debug, Clone)] -pub struct PeerRequest { +pub(crate) struct PeerRequest { /// The peer to send a request to. pub(crate) peer: PeerId, /// The request key, which is a [Cid]. @@ -63,7 +65,8 @@ pub struct PeerRequest { /// Events to capture. #[derive(Debug)] -pub enum Event { +#[allow(dead_code)] +pub(crate) enum Event { /// [Receipt] captured event. CapturedReceipt(Captured), /// General shutdown event. @@ -299,24 +302,57 @@ where #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] async fn handle_event(self, event_handler: &mut EventHandler, ipfs: IpfsCli) { match self { + #[cfg(all(feature = "websocket-server", feature = "websocket-notify"))] Event::CapturedReceipt(captured) => { let _ = captured.store(event_handler).map(|(cid, receipt)| { - // Spawn client call in background, without awaiting. - let handle = Handle::current(); - handle.spawn(async move { - match ipfs.put_receipt(receipt).await { - Ok(put_cid) => { - info!(cid = put_cid, "IPLD DAG node stored"); - - #[cfg(debug_assertions)] - debug_assert_eq!(put_cid, cid.to_string()); - } - Err(err) => { - info!(error=?err, cid=cid.to_string(), "Failed to store IPLD DAG node") - } + // Spawn client call in background, without awaiting. + let handle = Handle::current(); + // clone for IPLD conversion + let receipt_bytes: Vec = receipt.clone().try_into().unwrap(); + handle.spawn(async move { + match ipfs.put_receipt_bytes(receipt_bytes).await { + Ok(put_cid) => { + info!(cid = put_cid, "IPLD DAG node stored"); + + #[cfg(debug_assertions)] + debug_assert_eq!(put_cid, cid.to_string()); + } + Err(err) => { + info!(error=?err, cid=cid.to_string(), "Failed to store IPLD DAG node") + } + } + }); + + + let ws_tx = event_handler.ws_sender(); + handle.spawn(async move { + if let Ok(json_bytes) = receipt.to_json() + { + let _ = ws_tx.notify(json_bytes); + } + }); + + }); + } + #[cfg(not(any(feature = "websocket-server", feature = "websocket-notify")))] + Event::CapturedReceipt(captured) => { + let _ = captured.store(event_handler).map(|(cid, receipt)| { + // Spawn client call in background, without awaiting. + let handle = Handle::current(); + handle.spawn(async move { + match ipfs.put_receipt(receipt).await { + Ok(put_cid) => { + info!(cid = put_cid, "IPLD DAG node stored"); + + #[cfg(debug_assertions)] + debug_assert_eq!(put_cid, cid.to_string()); + } + Err(err) => { + info!(error=?err, cid=cid.to_string(), "Failed to store IPLD DAG node") } - }); + } }); + }); } event => { if let Err(err) = event.handle_info(event_handler).await { diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index e4391cb1..2adeea06 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -460,8 +460,11 @@ mod test { ); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let workflow_info = - workflow::Info::default(workflow.clone().to_cid().unwrap(), workflow.len()); + let stored_info = workflow::Stored::default( + Pointer::new(workflow.clone().to_cid().unwrap()), + workflow.len() as i32, + ); + let workflow_info = workflow::Info::default(stored_info); let workflow_cid_bytes = workflow_info.cid_as_bytes(); let bytes = workflow_info.capsule().unwrap(); let record = Record::new(workflow_cid_bytes, bytes); diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index 7af6ec55..d0599ea4 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -31,6 +31,11 @@ mod tasks; mod worker; pub mod workflow; +/// Test utilities. +#[cfg(any(test, feature = "test-utils"))] +#[cfg_attr(docsrs, doc(cfg(feature = "test-utils")))] +pub mod test_utils; + pub use db::Db; pub use event_handler::channel; pub use logger::*; @@ -39,8 +44,3 @@ pub use runner::Runner; pub use settings::Settings; pub(crate) use worker::Worker; pub use workflow::WORKFLOW_TAG; - -/// Test utilities. -#[cfg(any(test, feature = "test-utils"))] -#[cfg_attr(docsrs, doc(cfg(feature = "test-utils")))] -pub mod test_utils; diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index b30559b2..01341d01 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -23,18 +23,32 @@ fn main() -> Result<()> { } else { Settings::load() } - .expect("Failed to load settings"); + .expect("runtime settings to be loaded"); let _guard = if daemonize { - daemon::start(daemon_dir.clone()).expect("Failed to daemonize homestar runner"); + daemon::start(daemon_dir.clone()) + .expect("runner to be started as a daemon process"); FileLogger::init(daemon_dir) } else { Logger::init() }; - info!("starting with settings: {:?}", settings,); - Db::set_url(database_url).expect("Failed to set DB url"); - let db = Db::setup_connection_pool(settings.node()).expect("Failed to setup DB pool"); + info!( + subject = "settings", + category = "homestar_init", + "starting with settings: {:?}", + settings, + ); + + let db = Db::setup_connection_pool(settings.node(), database_url) + .expect("to setup database pool"); + + info!( + subject = "database", + category = "homestar_init", + "starting with database: {}", + Db::url().expect("database url to be provided"), + ); info!("starting Homestar runtime..."); Runner::start(settings, db).expect("Failed to start runtime") diff --git a/homestar-runtime/src/network/ipfs.rs b/homestar-runtime/src/network/ipfs.rs index bf4c41a6..76f61f6b 100644 --- a/homestar-runtime/src/network/ipfs.rs +++ b/homestar-runtime/src/network/ipfs.rs @@ -66,7 +66,7 @@ impl IpfsCli { let DagPutResponse { cid } = self .0 - .dag_put_with_options(Cursor::new(receipt_bytes.clone()), dag_builder) + .dag_put_with_options(Cursor::new(receipt_bytes), dag_builder) .await .expect("a CID"); diff --git a/homestar-runtime/src/network/rpc.rs b/homestar-runtime/src/network/rpc.rs index d52ec2a7..c810c44b 100644 --- a/homestar-runtime/src/network/rpc.rs +++ b/homestar-runtime/src/network/rpc.rs @@ -41,7 +41,7 @@ pub(crate) enum ServerMessage { /// /// [Runner]: crate::Runner GracefulShutdown(oneshot::Sender<()>), - Run(ReadWorkflow), + Run((Option, ReadWorkflow)), RunAck(response::AckWorkflow), RunErr(runner::Error), Skip, @@ -51,7 +51,10 @@ pub(crate) enum ServerMessage { #[tarpc::service] pub(crate) trait Interface { /// Returns a greeting for name. - async fn run(workflow_file: ReadWorkflow) -> Result; + async fn run( + name: Option, + workflow_file: ReadWorkflow, + ) -> Result; /// Ping the server. async fn ping() -> String; /// Stop the server. @@ -110,11 +113,12 @@ impl Interface for ServerHandler { async fn run( self, _: context::Context, + name: Option, workflow_file: ReadWorkflow, ) -> Result { let (tx, rx) = oneshot::channel(); self.runner_sender - .send((ServerMessage::Run(workflow_file), Some(tx))) + .send((ServerMessage::Run((name, workflow_file)), Some(tx))) .await .map_err(|e| Error::FailureToSendOnChannel(e.to_string()))?; @@ -245,8 +249,9 @@ impl Client { /// [Workflow]: homestar_core::Workflow pub async fn run( &self, + name: Option, workflow_file: ReadWorkflow, ) -> Result, RpcError> { - self.cli.run(self.ctx, workflow_file).await + self.cli.run(self.ctx, name, workflow_file).await } } diff --git a/homestar-runtime/src/network/ws.rs b/homestar-runtime/src/network/ws.rs index 6300aa14..ed0f8650 100644 --- a/homestar-runtime/src/network/ws.rs +++ b/homestar-runtime/src/network/ws.rs @@ -27,7 +27,26 @@ use tokio::{ use tracing::{debug, info}; /// Type alias for websocket sender. -pub type Sender = Arc>; +#[derive(Debug, Clone)] +pub(crate) struct Notifier(Arc>>); + +impl Notifier { + #[allow(dead_code)] + fn inner(&self) -> &Arc>> { + &self.0 + } + + #[allow(dead_code)] + fn into_inner(self) -> Arc>> { + self.0 + } + + /// Send a message to all connected websocket clients. + pub(crate) fn notify(&self, msg: Vec) -> Result<()> { + let _ = self.0.send(msg)?; + Ok(()) + } +} /// Message type for messages sent back from the /// websocket server to the [runner] for example. @@ -45,13 +64,15 @@ pub(crate) enum Message { #[derive(Clone)] pub(crate) struct Server { addr: SocketAddr, - msg_sender: Arc, + msg_sender: Notifier, } impl Server { /// Setup bounded, MPMC channel for runtime to send and received messages /// through the websocket connection(s). - fn setup_channel(capacity: usize) -> (broadcast::Sender, broadcast::Receiver) { + fn setup_channel( + capacity: usize, + ) -> (broadcast::Sender>, broadcast::Receiver>) { broadcast::channel(capacity) } @@ -71,7 +92,7 @@ impl Server { Ok(Self { addr, - msg_sender: Arc::new(sender.into()), + msg_sender: Notifier(sender.into()), }) } @@ -95,7 +116,7 @@ impl Server { /// Get websocket message sender for broadcasting messages to websocket /// clients. - pub(crate) fn sender(&self) -> Arc { + pub(crate) fn notifier(&self) -> Notifier { self.msg_sender.clone() } } @@ -150,17 +171,13 @@ async fn handle_socket(mut socket: ws::WebSocket, state: Server) { // By splitting socket we can send and receive at the same time. let (mut socket_sender, mut socket_receiver) = socket.split(); - let mut subscribed_rx = state.msg_sender.subscribe(); + let mut subscribed_rx = state.msg_sender.inner().subscribe(); let handle = Handle::current(); let mut send_task = handle.spawn(async move { while let Ok(msg) = subscribed_rx.recv().await { // In any websocket error, break loop. - if socket_sender - .send(AxumMsg::Binary(msg.into())) - .await - .is_err() - { + if socket_sender.send(AxumMsg::Binary(msg)).await.is_err() { break; } } diff --git a/homestar-runtime/src/receipt.rs b/homestar-runtime/src/receipt.rs index 0b7a399e..8a81b4a9 100644 --- a/homestar-runtime/src/receipt.rs +++ b/homestar-runtime/src/receipt.rs @@ -378,7 +378,7 @@ mod test { #[test] fn receipt_sql_roundtrip() { - let mut conn = MemoryDb::setup_connection_pool(Settings::load().unwrap().node()) + let mut conn = MemoryDb::setup_connection_pool(Settings::load().unwrap().node(), None) .unwrap() .conn() .unwrap(); diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index 86e6148d..80ab6bb0 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -43,13 +43,13 @@ pub(crate) use error::Error; const HOMESTAR_THREAD: &str = "homestar-runtime"; /// Type alias for a [DashMap] containing running worker [JoinHandle]s. -pub type RunningWorkerSet = DashMap>, delay_queue::Key)>; +pub(crate) type RunningWorkerSet = DashMap>, delay_queue::Key)>; /// Type alias for a [DashMap] containing running task [AbortHandle]s. -pub type RunningTaskSet = DashMap>; +pub(crate) type RunningTaskSet = DashMap>; /// Trait for managing a [DashMap] of running task information. -pub trait ModifiedSet { +pub(crate) trait ModifiedSet { /// Append or insert a new [AbortHandle] into the [RunningTaskSet]. fn append_or_insert(&self, cid: Cid, handles: Vec); } @@ -90,7 +90,6 @@ pub struct Runner { running_workers: RunningWorkerSet, runtime: tokio::runtime::Runtime, settings: Arc, - ws_msg_sender: Arc, ws_mpsc_sender: mpsc::Sender, } @@ -156,27 +155,27 @@ impl Runner { runtime: tokio::runtime::Runtime, ) -> Result { let swarm = runtime.block_on(swarm::new(settings.node()))?; - let event_handler = EventHandler::new(swarm, db, settings.node()); - let event_sender = event_handler.sender(); - - #[cfg(feature = "ipfs")] - let _event_handler_hdl = runtime.spawn({ - let ipfs = IpfsCli::default(); - event_handler.start(ipfs) - }); - - #[cfg(not(feature = "ipfs"))] - let _event_handler_hdl = runtime.spawn(event_handler.start()); #[cfg(feature = "websocket-server")] { // Setup websocket communication. let ws_server = ws::Server::new(settings.node().network())?; - let ws_msg_tx = ws_server.sender(); - + let ws_msg_tx = ws_server.notifier(); let (ws_tx, ws_rx) = mpsc::channel(settings.node.network.websocket_capacity); let _ws_hdl = runtime.spawn(ws_server.start(ws_rx)); + let event_handler = EventHandler::new(swarm, db, settings.node(), ws_msg_tx); + let event_sender = event_handler.sender(); + + #[cfg(feature = "ipfs")] + let _event_handler_hdl = runtime.spawn({ + let ipfs = IpfsCli::default(); + event_handler.start(ipfs) + }); + + #[cfg(not(feature = "ipfs"))] + let _event_handler_hdl = runtime.spawn(event_handler.start()); + Ok(Self { message_buffer_len: settings.node.network.events_buffer_len, event_sender, @@ -185,21 +184,34 @@ impl Runner { running_workers: DashMap::new(), runtime, settings: settings.into(), - ws_msg_sender: ws_msg_tx, ws_mpsc_sender: ws_tx, }) } #[cfg(not(feature = "websocket-server"))] - Ok(Self { - message_buffer_len: settings.node.network.events_buffer_len, - event_sender, - expiration_queue: Rc::new(AtomicRefCell::new(DelayQueue::new())), - running_tasks: DashMap::new().into(), - running_workers: DashMap::new(), - runtime, - settings: settings.into(), - }) + { + let event_handler = EventHandler::new(swarm, db, settings.node()); + let event_sender = event_handler.sender(); + + #[cfg(feature = "ipfs")] + let _event_handler_hdl = runtime.spawn({ + let ipfs = IpfsCli::default(); + event_handler.start(ipfs) + }); + + #[cfg(not(feature = "ipfs"))] + let _event_handler_hdl = runtime.spawn(event_handler.start()); + + Ok(Self { + message_buffer_len: settings.node.network.events_buffer_len, + event_sender, + expiration_queue: Rc::new(AtomicRefCell::new(DelayQueue::new())), + running_tasks: DashMap::new().into(), + running_workers: DashMap::new(), + runtime, + settings: settings.into(), + }) + } } /// Listen loop for [Runner] signals and messages. @@ -298,23 +310,15 @@ impl Runner { /// [mpsc::Sender] of the event-handler. /// /// [EventHandler]: crate::EventHandler - pub fn event_sender(&self) -> Arc> { + pub(crate) fn event_sender(&self) -> Arc> { self.event_sender.clone() } /// Getter for the [RunningTaskSet], cloned as an [Arc]. - pub fn running_tasks(&self) -> Arc { + pub(crate) fn running_tasks(&self) -> Arc { self.running_tasks.clone() } - /// [tokio::sync::broadcast::Sender] for sending messages through the - /// webSocket server to subscribers. - #[cfg(feature = "websocket-server")] - #[cfg_attr(docsrs, doc(cfg(feature = "websocket-server")))] - pub fn ws_msg_sender(&self) -> &ws::Sender { - &self.ws_msg_sender - } - /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] and /// workers in the [RunningWorkerSet]. #[allow(dead_code)] @@ -506,49 +510,38 @@ impl Runner { } } } - rpc::ServerMessage::Run(workflow_file) => { + rpc::ServerMessage::Run((name, workflow_file)) => { let (workflow, workflow_settings) = workflow_file.validate_and_parse().await.with_context(|| { format!("failed to validate/parse workflow @ path: {workflow_file}",) })?; - #[cfg(feature = "ipfs")] - let ipfs = IpfsCli::default(); - - #[cfg(feature = "ipfs")] let worker = { Worker::new( workflow, workflow_settings, + name, self.event_sender(), runner_sender, db.clone(), - ipfs, ) .await? }; - #[cfg(not(feature = "ipfs"))] - let worker = Worker::new( - workflow, - workflow_settings, - self.event_sender(), - runner_sender.into(), - db.clone(), - ) - .await?; - // Deliberate use of Arc::clone for readability, could just be // `clone`, as the underlying type is an `Arc`. let initial_info = Arc::clone(&worker.workflow_info); let workflow_timeout = worker.workflow_settings.timeout; + let workflow_name = worker.workflow_name.to_string(); let timestamp = worker.workflow_started; - // Spawn worker, which schedules execution graph and runs it. + // Spawn worker, which initializees the scheduler and runs + // the workflow. info!( cid = worker.workflow_info.cid.to_string(), "running workflow with settings: {:#?}", worker.workflow_settings ); + let handle = self.runtime.spawn(worker.run(self.running_tasks())); // Add Cid to expirations timing wheel @@ -563,7 +556,7 @@ impl Runner { .insert(initial_info.cid, (handle, delay_key)); Ok(ControlFlow::Continue(rpc::ServerMessage::RunAck( - response::AckWorkflow::new(initial_info, timestamp), + response::AckWorkflow::new(initial_info, workflow_name, timestamp), ))) } msg => { diff --git a/homestar-runtime/src/runner/file.rs b/homestar-runtime/src/runner/file.rs index 761691f4..b9de29d3 100644 --- a/homestar-runtime/src/runner/file.rs +++ b/homestar-runtime/src/runner/file.rs @@ -44,7 +44,7 @@ impl ReadWorkflow { match self.file.extension().and_then(OsStr::to_str) { None | Some("json") => { let data = fs::read_to_string(&self.file.canonicalize()?).await?; - // TODO: Parse this from the workflow file + // TODO: Parse this from the workflow data/file itself. let workflow_settings = workflow::Settings::default(); Ok(( DagJson::from_json_string(data).map_err(anyhow::Error::new)?, diff --git a/homestar-runtime/src/runner/response.rs b/homestar-runtime/src/runner/response.rs index 7020c076..ac78bfd4 100644 --- a/homestar-runtime/src/runner/response.rs +++ b/homestar-runtime/src/runner/response.rs @@ -20,6 +20,7 @@ use tabled::{ #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Tabled)] pub struct AckWorkflow { pub(crate) cid: Cid, + pub(crate) name: String, pub(crate) num_tasks: u32, #[tabled(skip)] pub(crate) progress: Vec, @@ -41,9 +42,14 @@ impl fmt::Display for AckWorkflow { impl AckWorkflow { /// Workflow information for response / display. - pub(crate) fn new(workflow_info: Arc, timestamp: NaiveDateTime) -> Self { + pub(crate) fn new( + workflow_info: Arc, + name: String, + timestamp: NaiveDateTime, + ) -> Self { Self { cid: workflow_info.cid, + name, num_tasks: workflow_info.num_tasks, progress: workflow_info.progress.clone(), progress_count: workflow_info.progress_count, diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index f8b23d8d..150b5a9d 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -2,6 +2,7 @@ //! [Workflow]. //! //! [Scheduler]: TaskScheduler +//! [Workflow]: homestar_core::Workflow use crate::{ db::{Connection, Database}, @@ -12,16 +13,13 @@ use crate::{ Event, }, network::swarm::CapsuleTag, - workflow::{self, Builder, IndexedResources, Resource, Vertex}, + workflow::{self, IndexedResources, Resource, Vertex}, Db, }; use anyhow::{anyhow, Result}; use dagga::Node; -use futures::future::LocalBoxFuture; -use homestar_core::{ - workflow::{InstructionResult, LinkMap, Pointer}, - Workflow, -}; +use futures::future::BoxFuture; +use homestar_core::workflow::{InstructionResult, LinkMap, Pointer}; use homestar_wasm::io::Arg; use indexmap::IndexMap; use libipld::Cid; @@ -44,6 +42,7 @@ pub(crate) struct ExecutionGraph<'a> { /// Vector of [resources] to fetch for executing functions in [Workflow]. /// /// [resources]: Resource + /// [Workflow]: homestar_core::Workflow pub(crate) indexed_resources: IndexedResources, } @@ -58,12 +57,14 @@ pub(crate) struct TaskScheduler<'a> { /// [ExecutionGraph] of what's been run so far for a [Workflow] of `batched` /// [Tasks]. /// + /// [Workflow]: homestar_core::Workflow /// [Tasks]: homestar_core::workflow::Task pub(crate) ran: Option>, /// [ExecutionGraph] of what's left to run for a [Workflow] of `batched` /// [Tasks]. /// + /// [Workflow]: homestar_core::Workflow /// [Tasks]: homestar_core::workflow::Task pub(crate) run: Schedule<'a>, @@ -75,16 +76,15 @@ pub(crate) struct TaskScheduler<'a> { /// /// This is transferred from the [ExecutionGraph] for executing the /// schedule by a worker. + /// + /// [Workflow]: homestar_core::Workflow pub(crate) resources: IndexMap>, } -/// Scheduler context containing the a schedule for executing tasks -/// and a map of [IndexedResources]. +/// Scheduler context containing the a schedule for executing tasks. pub(crate) struct SchedulerContext<'a> { /// Scheduler for a series of tasks, including what's run. pub(crate) scheduler: TaskScheduler<'a>, - /// Map of instructions => resources, for a [Workflow]. - pub(crate) indexed_resources: IndexedResources, } impl<'a> TaskScheduler<'a> { @@ -99,7 +99,7 @@ impl<'a> TaskScheduler<'a> { /// [Workflow]: homestar_core::Workflow #[allow(unknown_lints, clippy::needless_pass_by_ref_mut)] pub(crate) async fn init( - workflow: Workflow<'a, Arg>, + mut graph: Arc>, workflow_cid: Cid, settings: Arc, event_sender: Arc>, @@ -107,13 +107,13 @@ impl<'a> TaskScheduler<'a> { fetch_fn: F, ) -> Result> where - F: FnOnce(Vec) -> LocalBoxFuture<'a, Result>>>, + F: FnOnce(Vec) -> BoxFuture<'a, Result>>>, { - let builder = Builder::new(workflow); - let graph = builder.graph()?; - let mut schedule = graph.schedule; + let mut_graph = Arc::make_mut(&mut graph); + let schedule: &mut Schedule<'a> = mut_graph.schedule.as_mut(); let schedule_length = schedule.len(); let mut resources_to_fetch: Vec = vec![]; + let resume = schedule .iter() .enumerate() @@ -121,7 +121,7 @@ impl<'a> TaskScheduler<'a> { .try_for_each(|(idx, vec)| { let folded_pointers = vec.iter().try_fold(vec![], |mut ptrs, node| { let cid = Cid::from_str(node.name())?; - graph + mut_graph .indexed_resources .get(&cid) .map(|resource| { @@ -208,23 +208,21 @@ impl<'a> TaskScheduler<'a> { Ok(SchedulerContext { scheduler: Self { linkmap: Arc::new(linkmap.into()), - ran: Some(schedule), + ran: Some(schedule.to_vec()), run: pivot, resume_step: step, resources: fetched, }, - indexed_resources: graph.indexed_resources, }) } _ => Ok(SchedulerContext { scheduler: Self { linkmap: Arc::new(LinkMap::>::new().into()), ran: None, - run: schedule, + run: schedule.to_vec(), resume_step: None, resources: fetched, }, - indexed_resources: graph.indexed_resources, }), } } @@ -236,7 +234,7 @@ mod test { use crate::{ db::Database, test_utils::{self, db::MemoryDb}, - workflow as wf, Receipt, + workflow, Receipt, }; use futures::FutureExt; use homestar_core::{ @@ -246,6 +244,7 @@ mod test { config::Resources, instruction::RunInstruction, prf::UcanPrf, Invocation, Receipt as InvocationReceipt, Task, }, + Workflow, }; use libipld::Ipld; @@ -266,11 +265,11 @@ mod test { UcanPrf::default(), ); - let db = MemoryDb::setup_connection_pool(&settings.node).unwrap(); + let db = MemoryDb::setup_connection_pool(&settings.node, None).unwrap(); let mut conn = db.conn().unwrap(); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); - let workflow_settings = wf::Settings::default(); + let workflow_settings = workflow::Settings::default(); let fetch_fn = |_rscs: Vec| { { async { @@ -281,13 +280,16 @@ mod test { Ok(index_map) } } - .boxed_local() + .boxed() }; + let builder = workflow::Builder::new(workflow); + let graph = builder.graph().unwrap(); + let (tx, mut _rx) = test_utils::event::setup_event_channel(settings.node); let scheduler_ctx = TaskScheduler::init( - workflow, + graph.into(), workflow_cid, workflow_settings.into(), tx.into(), @@ -335,7 +337,7 @@ mod test { ) .unwrap(); - let db = MemoryDb::setup_connection_pool(&settings.node).unwrap(); + let db = MemoryDb::setup_connection_pool(&settings.node, None).unwrap(); let mut conn = db.conn().unwrap(); let stored_receipt = MemoryDb::store_receipt(receipt.clone(), &mut conn).unwrap(); @@ -343,7 +345,7 @@ mod test { let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); - let workflow_settings = wf::Settings::default(); + let workflow_settings = workflow::Settings::default(); let fetch_fn = |_rscs: Vec| { { async { @@ -354,13 +356,16 @@ mod test { Ok(index_map) } } - .boxed_local() + .boxed() }; let (tx, mut _rx) = test_utils::event::setup_event_channel(settings.node); + let builder = workflow::Builder::new(workflow); + let graph = builder.graph().unwrap(); + let scheduler_ctx = TaskScheduler::init( - workflow, + graph.into(), workflow_cid, workflow_settings.into(), tx.into(), @@ -429,14 +434,14 @@ mod test { ) .unwrap(); - let db = MemoryDb::setup_connection_pool(&settings.node).unwrap(); + let db = MemoryDb::setup_connection_pool(&settings.node, None).unwrap(); let mut conn = db.conn().unwrap(); let rows_inserted = MemoryDb::store_receipts(vec![receipt1, receipt2], &mut conn).unwrap(); assert_eq!(2, rows_inserted); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); let workflow_cid = workflow.clone().to_cid().unwrap(); - let workflow_settings = wf::Settings::default(); + let workflow_settings = workflow::Settings::default(); let fetch_fn = |_rscs: Vec| { async { let mut index_map = IndexMap::new(); @@ -444,13 +449,16 @@ mod test { index_map.insert(Resource::Url(instruction2.resource().to_owned()), vec![]); Ok(index_map) } - .boxed_local() + .boxed() }; let (tx, mut _rx) = test_utils::event::setup_event_channel(settings.node); + let builder = workflow::Builder::new(workflow); + let graph = builder.graph().unwrap(); + let scheduler_ctx = TaskScheduler::init( - workflow, + graph.into(), workflow_cid, workflow_settings.into(), tx.into(), diff --git a/homestar-runtime/src/tasks.rs b/homestar-runtime/src/tasks.rs index 84aed823..49f0e60d 100644 --- a/homestar-runtime/src/tasks.rs +++ b/homestar-runtime/src/tasks.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use enum_assoc::Assoc; use std::path::PathBuf; +pub(crate) mod fetch; mod wasm; pub(crate) use wasm::*; diff --git a/homestar-runtime/src/tasks/fetch.rs b/homestar-runtime/src/tasks/fetch.rs new file mode 100644 index 00000000..00bec544 --- /dev/null +++ b/homestar-runtime/src/tasks/fetch.rs @@ -0,0 +1,141 @@ +//! Fetch module for gathering data over the network related to [Task] +//! resources. +//! +//! [Task]: homestar_core::workflow::Task + +#[cfg(feature = "ipfs")] +use crate::network::IpfsCli; +#[cfg(any(test, feature = "test-utils"))] +use crate::tasks::WasmContext; +use crate::workflow::{self, Resource}; +use anyhow::Result; +#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] +use futures::{stream::FuturesUnordered, TryStreamExt}; +use indexmap::IndexMap; +#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] +use libipld::Cid; +use std::sync::Arc; +#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] +use tryhard::RetryFutureConfig; + +/// Gather resources from IPFS or elsewhere, leveraging an exponential backoff. +#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] +#[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] +pub(crate) async fn get_resources( + resources: Vec, + settings: Arc, + ipfs: IpfsCli, +) -> Result>> { + let settings = settings.as_ref(); + let tasks = FuturesUnordered::new(); + for rsc in resources.iter() { + let task = tryhard::retry_fn(|| async { fetch(rsc.clone(), ipfs.clone()).await }) + .with_config( + RetryFutureConfig::new(settings.retries) + .exponential_backoff(settings.retry_initial_delay) + .max_delay(settings.retry_max_delay), + ); + + tasks.push(task); + } + + tasks.try_collect::>().await?.into_iter().try_fold( + IndexMap::default(), + |mut acc, res| { + let answer = res.1?; + acc.insert(res.0, answer); + Ok::<_, anyhow::Error>(acc) + }, + ) +} + +/// Gather resources via URLs, leveraging an exponential backoff. +/// TODO: Client calls (only) over http(s). +#[cfg(all(not(feature = "ipfs"), not(test), not(feature = "test-utils")))] +#[allow(dead_code)] +pub(crate) async fn get_resources( + _resources: Vec, + _settings: Arc, +) -> Result> { + Ok(IndexMap::default()) +} + +#[cfg(all(not(feature = "ipfs"), any(test, feature = "test-utils")))] +#[doc(hidden)] +#[allow(dead_code)] +pub(crate) async fn get_resources( + _resources: Vec, + _settings: Arc, +) -> Result>> { + println!("Running in test mode"); + use crate::tasks::FileLoad; + let path = std::path::PathBuf::from(format!( + "{}/../homestar-wasm/fixtures/example_test.wasm", + env!("CARGO_MANIFEST_DIR") + )); + let bytes = WasmContext::load(path).await?; + let mut map = IndexMap::default(); + let rsc = "ipfs://bafybeihzvrlcfqf6ffbp2juhuakspxj2bdsc54cabxnuxfvuqy5lvfxapy"; + map.insert(Resource::Url(url::Url::parse(rsc)?), bytes); + Ok(map) +} + +#[cfg(all(feature = "ipfs", any(test, feature = "test-utils")))] +#[doc(hidden)] +#[allow(dead_code)] +pub(crate) async fn get_resources( + _resources: Vec, + _settings: Arc, + _ipfs: IpfsCli, +) -> Result>> { + println!("Running in test mode"); + use crate::tasks::FileLoad; + let path = std::path::PathBuf::from(format!( + "{}/../homestar-wasm/fixtures/example_test.wasm", + env!("CARGO_MANIFEST_DIR") + )); + let bytes = WasmContext::load(path).await?; + let mut map = IndexMap::default(); + let rsc = "ipfs://bafybeihzvrlcfqf6ffbp2juhuakspxj2bdsc54cabxnuxfvuqy5lvfxapy"; + map.insert(Resource::Url(url::Url::parse(rsc)?), bytes); + Ok(map) +} + +#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] +#[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] +async fn fetch(rsc: Resource, client: IpfsCli) -> Result<(Resource, Result>)> { + match rsc { + Resource::Url(url) => { + let bytes = match (url.scheme(), url.domain(), url.path()) { + ("ipfs", Some(cid), _) => { + let cid = Cid::try_from(cid)?; + client.get_cid(cid).await + } + (_, Some("ipfs.io"), _) => client.get_resource(&url).await, + (_, _, path) if path.contains("/ipfs/") || path.contains("/ipns/") => { + client.get_resource(&url).await + } + (_, Some(domain), _) => { + let split: Vec<&str> = domain.splitn(3, '.').collect(); + // subdomain-gateway case: + // + if let (Ok(_cid), "ipfs") = (Cid::try_from(split[0]), split[1]) { + client.get_resource(&url).await + } else { + // TODO: reqwest call + todo!() + } + } + // TODO: reqwest call + (_, _, _) => todo!(), + }; + Ok((Resource::Url(url), bytes)) + } + + Resource::Cid(cid) => { + // TODO: Check blockstore first. + let bytes = client.get_cid(cid).await; + Ok((Resource::Cid(cid), bytes)) + } + } +} diff --git a/homestar-runtime/src/test_utils/db.rs b/homestar-runtime/src/test_utils/db.rs index 4a7f3ecd..fbb0f4d1 100644 --- a/homestar-runtime/src/test_utils/db.rs +++ b/homestar-runtime/src/test_utils/db.rs @@ -38,8 +38,22 @@ impl Clone for MemoryDb { } impl Database for MemoryDb { - fn setup_connection_pool(settings: &settings::Node) -> Result { - let database_url = env::var(ENV).unwrap_or_else(|_| { + fn set_url(database_url: Option) -> Option { + database_url.map(|url| { + env::set_var(ENV, &url); + url + }) + } + + fn url() -> Result { + Ok(env::var(ENV)?) + } + + fn setup_connection_pool( + settings: &settings::Node, + database_url: Option, + ) -> Result { + let database_url = Self::set_url(database_url).unwrap_or_else(|| { settings .db .url diff --git a/homestar-runtime/src/test_utils/proc_macro/src/lib.rs b/homestar-runtime/src/test_utils/proc_macro/src/lib.rs index 43fc1fe1..40c957ed 100644 --- a/homestar-runtime/src/test_utils/proc_macro/src/lib.rs +++ b/homestar-runtime/src/test_utils/proc_macro/src/lib.rs @@ -101,7 +101,7 @@ pub fn runner_test(_attr: TokenStream, item: TokenStream) -> TokenStream { settings.node.network.websocket_port = ::homestar_core::test_utils::ports::get_port() as u16; settings.node.network.rpc_port = ::homestar_core::test_utils::ports::get_port() as u16; settings.node.db.url = Some(format!("{}.db", #func_name_as_string)); - let db = crate::test_utils::db::MemoryDb::setup_connection_pool(&settings.node).unwrap(); + let db = crate::test_utils::db::MemoryDb::setup_connection_pool(&settings.node, None).unwrap(); let runner = crate::Runner::start(settings.clone(), db).unwrap(); TestRunner { runner, settings } } diff --git a/homestar-runtime/src/test_utils/worker_builder.rs b/homestar-runtime/src/test_utils/worker_builder.rs index a759d5e9..b1bb84c9 100644 --- a/homestar-runtime/src/test_utils/worker_builder.rs +++ b/homestar-runtime/src/test_utils/worker_builder.rs @@ -1,8 +1,6 @@ //! Module for building out [Worker]s for testing purposes. use super::{db::MemoryDb, event}; -#[cfg(feature = "ipfs")] -use crate::network::IpfsCli; use crate::{ db::Database, event_handler::Event, settings, worker::WorkerMessage, workflow, Settings, Worker, }; @@ -17,31 +15,18 @@ use libipld::Cid; use std::sync::Arc; use tokio::sync::mpsc; -#[cfg(feature = "ipfs")] -pub(crate) struct WorkerBuilder<'a> { - db: MemoryDb, - event_sender: Arc>, - runner_sender: mpsc::Sender, - ipfs: IpfsCli, - workflow: Workflow<'a, Arg>, - workflow_settings: workflow::Settings, -} - -#[cfg(not(feature = "ipfs"))] pub(crate) struct WorkerBuilder<'a> { db: MemoryDb, event_sender: Arc>, runner_sender: mpsc::Sender, + name: Option, workflow: Workflow<'a, Arg>, workflow_settings: workflow::Settings, } impl<'a> WorkerBuilder<'a> { /// Create a new, default instance of a builder to generate a test [Worker]. - #[cfg(feature = "ipfs")] pub(crate) fn new(settings: settings::Node) -> Self { - let ipfs = IpfsCli::default(); - let config = Resources::default(); let (instruction1, instruction2, _) = workflow_test_utils::related_wasm_instructions::(); @@ -58,67 +43,26 @@ impl<'a> WorkerBuilder<'a> { let (evt_tx, _rx) = event::setup_event_channel(settings.clone()); let (wk_tx, _rx) = event::setup_worker_channel(settings.clone()); - Self { - db: MemoryDb::setup_connection_pool(&settings).unwrap(), - event_sender: evt_tx.into(), - runner_sender: wk_tx, - ipfs, - workflow: Workflow::new(vec![task1, task2]), - workflow_settings: workflow::Settings::default(), - } - } - /// Create a new, default instance of a builder to generate a test [Worker]. - #[cfg(not(feature = "ipfs"))] - pub(crate) fn new(settings: settings::Node) -> Self { - let config = Resources::default(); - let (instruction1, instruction2, _) = - workflow_test_utils::related_wasm_instructions::(); - let task1 = Task::new( - RunInstruction::Expanded(instruction1), - config.clone().into(), - UcanPrf::default(), - ); - let task2 = Task::new( - RunInstruction::Expanded(instruction2), - config.into(), - UcanPrf::default(), - ); - - let (evt_tx, _rx) = event::setup_event_channel(settings.clone()); - let (wk_tx, _rx) = event::setup_worker_channel(settings.clone()); + let workflow = Workflow::new(vec![task1, task2]); + let workflow_cid = workflow.clone().to_cid().unwrap(); Self { - db: MemoryDb::setup_connection_pool(&settings).unwrap(), + db: MemoryDb::setup_connection_pool(&settings, None).unwrap(), event_sender: evt_tx.into(), runner_sender: wk_tx, - workflow: Workflow::new(vec![task1, task2]), + name: Some(workflow_cid.to_string()), + workflow, workflow_settings: workflow::Settings::default(), } } /// Build a [Worker] from the current state of the builder. - #[cfg(feature = "ipfs")] - #[allow(dead_code)] - pub(crate) async fn build(self) -> Worker<'a, MemoryDb> { - Worker::new( - self.workflow, - self.workflow_settings, - self.event_sender, - self.runner_sender, - self.db, - self.ipfs, - ) - .await - .unwrap() - } - - /// Build a [Worker] from the current state of the builder. - #[cfg(not(feature = "ipfs"))] #[allow(dead_code)] pub(crate) async fn build(self) -> Worker<'a, MemoryDb> { Worker::new( self.workflow, self.workflow_settings, + self.name, self.event_sender, self.runner_sender, self.db, diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 8b3eec40..7e396ff4 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -6,8 +6,6 @@ #[cfg(feature = "ipfs")] use crate::network::IpfsCli; -#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] -use crate::workflow::settings::BackoffStrategy; use crate::{ db::Database, event_handler::{ @@ -18,23 +16,21 @@ use crate::{ }, network::swarm::CapsuleTag, runner::{ModifiedSet, RunningTaskSet}, - scheduler::TaskScheduler, - tasks::{RegisteredTasks, WasmContext}, + scheduler::{ExecutionGraph, TaskScheduler}, + tasks::{fetch, RegisteredTasks, WasmContext}, workflow::{self, Resource}, Db, Receipt, }; use anyhow::{anyhow, Result}; use chrono::NaiveDateTime; use futures::FutureExt; -#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] -use futures::StreamExt; use homestar_core::{ bail, ipld::DagCbor, workflow::{ error::ResolveError, prf::UcanPrf, - receipt::metadata::{OP_KEY, WORKFLOW_KEY}, + receipt::metadata::{OP_KEY, REPLAYED_KEY, WORKFLOW_KEY, WORKFLOW_NAME_KEY}, InstructionResult, LinkMap, Pointer, Receipt as InvocationReceipt, }, Workflow, @@ -51,8 +47,6 @@ use tokio::{ task::JoinSet, }; use tracing::{debug, error}; -#[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] -use tryhard::RetryFutureConfig; /// [JoinSet] of tasks run by a [Worker]. #[allow(dead_code)] @@ -67,10 +61,11 @@ pub(crate) enum WorkerMessage { #[allow(dead_code)] #[allow(missing_debug_implementations)] pub(crate) struct Worker<'a, DB: Database> { - pub(crate) scheduler: TaskScheduler<'a>, + pub(crate) graph: Arc>, pub(crate) event_sender: Arc>, pub(crate) runner_sender: mpsc::Sender, pub(crate) db: DB, + pub(crate) workflow_name: String, pub(crate) workflow_info: Arc, pub(crate) workflow_settings: Arc, pub(crate) workflow_started: NaiveDateTime, @@ -81,42 +76,32 @@ where DB: Database + 'static, { /// Instantiate a new [Worker] for a [Workflow]. - #[cfg(not(feature = "ipfs"))] + /// + /// TODO: integrate settings within workflow #[allow(dead_code)] pub(crate) async fn new( workflow: Workflow<'a, Arg>, settings: workflow::Settings, + // Name would be runner specific, separated from core workflow spec. + name: Option, event_sender: Arc>, runner_sender: mpsc::Sender, db: DB, ) -> Result> { let p2p_timeout = settings.p2p_timeout; let workflow_len = workflow.len(); - let workflow_settings = Arc::new(settings); - let workflow_settings_scheduler = workflow_settings.clone(); - let workflow_settings_worker = workflow_settings.clone(); - - let fetch_fn = |rscs: Vec| { - async { Self::get_resources(rscs, workflow_settings).await }.boxed_local() - }; - // Need to take ownership here to get the cid. let workflow_cid = workflow.to_owned().to_cid()?; - let scheduler_ctx = TaskScheduler::init( - workflow, - workflow_cid, - workflow_settings_scheduler, - event_sender.clone(), - &mut db.conn()?, - fetch_fn, - ) - .await?; + let builder = workflow::Builder::new(workflow); + let graph = builder.graph()?; + let name = name.unwrap_or(workflow_cid.to_string()); let (workflow_info, timestamp) = workflow::Info::init( workflow_cid, workflow_len, - scheduler_ctx.indexed_resources, + name.to_string(), + graph.indexed_resources.clone(), p2p_timeout, event_sender.clone(), db.conn()?, @@ -124,68 +109,13 @@ where .await?; Ok(Self { - scheduler: scheduler_ctx.scheduler, + graph: graph.into(), event_sender, runner_sender, db, + workflow_name: name, workflow_info: workflow_info.into(), - workflow_settings: workflow_settings_worker, - workflow_started: timestamp, - }) - } - - /// Instantiate a new [Worker] for a [Workflow]. - #[cfg(feature = "ipfs")] - #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] - #[allow(dead_code)] - pub(crate) async fn new( - workflow: Workflow<'a, Arg>, - settings: workflow::Settings, - event_sender: Arc>, - runner_sender: mpsc::Sender, - db: DB, - ipfs: IpfsCli, - ) -> Result> { - let p2p_timeout = settings.p2p_timeout; - let workflow_len = workflow.len(); - let workflow_settings = Arc::new(settings); - let workflow_settings_scheduler = workflow_settings.clone(); - let workflow_settings_worker = workflow_settings.clone(); - - let fetch_fn = |rscs: Vec| { - async { Self::get_resources(rscs, workflow_settings, ipfs).await }.boxed_local() - }; - - // Need to take ownership here to get the cid. - let workflow_cid = workflow.to_owned().to_cid()?; - - let scheduler_ctx = TaskScheduler::init( - workflow, - workflow_cid, - workflow_settings_scheduler, - event_sender.clone(), - &mut db.conn()?, - fetch_fn, - ) - .await?; - - let (workflow_info, timestamp) = workflow::Info::init( - workflow_cid, - workflow_len, - scheduler_ctx.indexed_resources, - p2p_timeout, - event_sender.clone(), - db.conn()?, - ) - .await?; - - Ok(Self { - scheduler: scheduler_ctx.scheduler, - event_sender, - runner_sender, - db, - workflow_info: workflow_info.into(), - workflow_settings: workflow_settings_worker, + workflow_settings: settings.into(), workflow_started: timestamp, }) } @@ -209,10 +139,40 @@ where /// [Instruction]: homestar_core::workflow::Instruction /// [Swarm]: crate::network::swarm pub(crate) async fn run(self, running_tasks: Arc) -> Result<()> { - self.run_queue(running_tasks).await + let workflow_settings_fetch = self.workflow_settings.clone(); + #[cfg(feature = "ipfs")] + let fetch_fn = { + let ipfs = IpfsCli::default(); + + move |rscs: Vec| { + async move { fetch::get_resources(rscs, workflow_settings_fetch, ipfs).await } + .boxed() + } + }; + + #[cfg(not(feature = "ipfs"))] + let fetch_fn = |rscs: Vec| { + async move { fetch::get_resources(rscs, workflow_settings_fetch).await }.boxed() + }; + + let scheduler_ctx = TaskScheduler::init( + self.graph.clone(), // Arc'ed + self.workflow_info.cid, + self.workflow_settings.clone(), + self.event_sender.clone(), + &mut self.db.conn()?, + fetch_fn, + ) + .await?; + + self.run_queue(scheduler_ctx.scheduler, running_tasks).await } - async fn run_queue(mut self, running_tasks: Arc) -> Result<()> { + async fn run_queue( + mut self, + scheduler: TaskScheduler<'a>, + running_tasks: Arc, + ) -> Result<()> { async fn insert_into_map(map: Arc>>, key: Cid, value: T) where T: Clone, @@ -234,7 +194,8 @@ where if let Some(result) = linkmap.read().await.get(&cid) { Ok(result.to_owned()) } else { - match Db::find_instruction(cid, &mut db.conn()?) { + let conn = &mut db.conn()?; + match Db::find_instruction(cid, conn) { Ok(found) => Ok(found.output_as_arg()), Err(_) => { debug!("no related instruction receipt found in the DB"); @@ -265,8 +226,7 @@ where }; let receipt = - Db::commit_receipt(workflow_cid, found.clone(), &mut db.conn()?) - .unwrap_or(found); + Db::commit_receipt(workflow_cid, found.clone(), conn).unwrap_or(found); let found_result = receipt.output_as_arg(); // Store the result in the linkmap for use in next iterations. @@ -276,8 +236,8 @@ where } } } - // Need to take ownership of the schedule - for batch in self.scheduler.run.clone().into_iter() { + // Need to take ownership of the scheduler + for batch in scheduler.run.into_iter() { let (mut task_set, handles) = batch.into_iter().try_fold( (TaskSet::new(), vec![]), |(mut task_set, mut handles), node| { @@ -290,14 +250,18 @@ where let args = parsed.into_args(); let meta = Ipld::Map(BTreeMap::from([ + (REPLAYED_KEY.into(), scheduler.ran.is_some().into()), (OP_KEY.into(), fun.to_string().into()), (WORKFLOW_KEY.into(), self.workflow_info.cid().into()), + ( + WORKFLOW_NAME_KEY.into(), + self.workflow_name.to_string().into(), + ), ])); match RegisteredTasks::ability(&instruction.op().to_string()) { Some(RegisteredTasks::WasmRun) => { - let wasm = self - .scheduler + let wasm = scheduler .resources .get(&Resource::Url(rsc.to_owned())) .ok_or_else(|| anyhow!("resource not available"))? @@ -309,7 +273,7 @@ where let db = self.db.clone(); let settings = self.workflow_settings.clone(); - let linkmap = self.scheduler.linkmap.clone(); + let linkmap = scheduler.linkmap.clone(); let event_sender = self.event_sender.clone(); let workflow_cid = self.workflow_info.cid(); @@ -361,7 +325,7 @@ where ); let receipt = Receipt::try_with(instruction_ptr, &invocation_receipt)?; - self.scheduler.linkmap.write().await.insert( + scheduler.linkmap.write().await.insert( Cid::try_from(receipt.instruction())?, receipt.output_as_arg(), ); @@ -369,7 +333,7 @@ where // modify workflow info before progress update, in case // that we time out getting info from the network, but later // recovered where we last started from. - if let Some(step) = self.scheduler.resume_step { + if let Some(step) = scheduler.resume_step { let current_progress_count = self.workflow_info.progress_count; Arc::make_mut(&mut self.workflow_info) .set_progress_count(std::cmp::max(current_progress_count, step as u32)) @@ -392,174 +356,6 @@ where } Ok(()) } - - #[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] - #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] - async fn get_resources( - resources: Vec, - settings: Arc, - ipfs: IpfsCli, - ) -> Result>> { - use tokio::runtime::Handle; - let num_requests = resources.len(); - let settings = settings.as_ref(); - futures::stream::iter(resources.iter().map(|rsc| async { - let ipfs = ipfs.clone(); - let handle = Handle::current(); - // Have to enumerate configs here, as type variants are different - // and cannot be matched on. - match settings.retry_backoff_strategy { - BackoffStrategy::Exponential => { - tryhard::retry_fn(|| { - let rsc = rsc.clone(); - let client = ipfs.clone(); - handle.spawn(async move { Self::fetch(rsc, client).await }) - }) - .with_config( - RetryFutureConfig::new(settings.retries) - .exponential_backoff(settings.retry_initial_delay) - .max_delay(settings.retry_max_delay), - ) - .await - } - BackoffStrategy::Fixed => { - tryhard::retry_fn(|| { - let rsc = rsc.clone(); - let client = ipfs.clone(); - handle.spawn(async move { Self::fetch(rsc, client).await }) - }) - .with_config( - RetryFutureConfig::new(settings.retries) - .fixed_backoff(settings.retry_initial_delay) - .max_delay(settings.retry_max_delay), - ) - .await - } - BackoffStrategy::Linear => { - tryhard::retry_fn(|| { - let rsc = rsc.clone(); - let client = ipfs.clone(); - handle.spawn(async move { Self::fetch(rsc, client).await }) - }) - .with_config( - RetryFutureConfig::new(settings.retries) - .linear_backoff(settings.retry_initial_delay) - .max_delay(settings.retry_max_delay), - ) - .await - } - BackoffStrategy::None => { - tryhard::retry_fn(|| { - let rsc = rsc.clone(); - let client = ipfs.clone(); - handle.spawn(async move { Self::fetch(rsc, client).await }) - }) - .with_config(RetryFutureConfig::new(settings.retries).no_backoff()) - .await - } - } - })) - .buffer_unordered(num_requests) - .collect::>() - .await - .into_iter() - .try_fold(IndexMap::default(), |mut acc, res| { - let inner = res??; - let answer = inner.1?; - acc.insert(inner.0, answer); - Ok::<_, anyhow::Error>(acc) - }) - } - - #[cfg(all(feature = "ipfs", not(test), not(feature = "test-utils")))] - #[cfg_attr(docsrs, doc(cfg(feature = "ipfs")))] - async fn fetch(rsc: Resource, client: IpfsCli) -> Result<(Resource, Result>)> { - match rsc { - Resource::Url(url) => { - let bytes = match (url.scheme(), url.domain(), url.path()) { - ("ipfs", Some(cid), _) => { - let cid = Cid::try_from(cid)?; - client.get_cid(cid).await - } - (_, Some("ipfs.io"), _) => client.get_resource(&url).await, - (_, _, path) if path.contains("/ipfs/") || path.contains("/ipns/") => { - client.get_resource(&url).await - } - (_, Some(domain), _) => { - let split: Vec<&str> = domain.splitn(3, '.').collect(); - // subdomain-gateway case: - // - if let (Ok(_cid), "ipfs") = (Cid::try_from(split[0]), split[1]) { - client.get_resource(&url).await - } else { - // TODO: reqwest call - todo!() - } - } - // TODO: reqwest call - (_, _, _) => todo!(), - }; - Ok((Resource::Url(url), bytes)) - } - - Resource::Cid(cid) => { - let bytes = client.get_cid(cid).await; - Ok((Resource::Cid(cid), bytes)) - } - } - } - - /// TODO: Client calls (only) over http(s). - #[cfg(all(not(feature = "ipfs"), not(test), not(feature = "test-utils")))] - #[doc(hidden)] - #[allow(dead_code)] - async fn get_resources( - _resources: Vec, - _settings: Arc, - ) -> Result> { - Ok(IndexMap::default()) - } - - #[cfg(all(not(feature = "ipfs"), any(test, feature = "test-utils")))] - #[doc(hidden)] - #[allow(dead_code)] - async fn get_resources( - _resources: Vec, - _settings: Arc, - ) -> Result>> { - println!("Running in test mode"); - use crate::tasks::FileLoad; - let path = std::path::PathBuf::from(format!( - "{}/../homestar-wasm/fixtures/example_test.wasm", - env!("CARGO_MANIFEST_DIR") - )); - let bytes = WasmContext::load(path).await?; - let mut map = IndexMap::default(); - let rsc = "ipfs://bafybeihzvrlcfqf6ffbp2juhuakspxj2bdsc54cabxnuxfvuqy5lvfxapy"; - map.insert(Resource::Url(url::Url::parse(rsc)?), bytes); - Ok(map) - } - - #[cfg(all(feature = "ipfs", any(test, feature = "test-utils")))] - #[doc(hidden)] - #[allow(dead_code)] - async fn get_resources( - _resources: Vec, - _settings: Arc, - _ipfs: IpfsCli, - ) -> Result>> { - println!("Running in test mode"); - use crate::tasks::FileLoad; - let path = std::path::PathBuf::from(format!( - "{}/../homestar-wasm/fixtures/example_test.wasm", - env!("CARGO_MANIFEST_DIR") - )); - let bytes = WasmContext::load(path).await?; - let mut map = IndexMap::default(); - let rsc = "ipfs://bafybeihzvrlcfqf6ffbp2juhuakspxj2bdsc54cabxnuxfvuqy5lvfxapy"; - map.insert(Resource::Url(url::Url::parse(rsc)?), bytes); - Ok(map) - } } impl<'a, DB> Drop for Worker<'a, DB> @@ -579,7 +375,7 @@ mod test { use crate::{ db::Database, test_utils::{self, db::MemoryDb, WorkerBuilder}, - workflow::IndexedResources, + workflow::{self, IndexedResources}, }; use homestar_core::{ ipld::DagCbor, @@ -600,10 +396,6 @@ mod test { let worker = builder.build().await; let workflow_cid = worker.workflow_info.cid; - assert!(worker.scheduler.linkmap.read().await.is_empty()); - assert!(worker.scheduler.ran.is_none()); - assert_eq!(worker.scheduler.run.len(), 2); - assert_eq!(worker.scheduler.resume_step, None); assert_eq!(worker.workflow_info.cid, workflow_cid); assert_eq!(worker.workflow_info.num_tasks, 2); assert_eq!(worker.workflow_info.resources.len(), 2); @@ -641,7 +433,8 @@ mod test { receipt: next_receipt, .. }) => { - let mut info = workflow::Info::default(workflow_cid, 2); + let stored = workflow::Stored::default(Pointer::new(workflow_cid), 2); + let mut info = workflow::Info::default(stored); info.increment_progress(next_receipt.cid()); (next_receipt, info) @@ -654,7 +447,8 @@ mod test { receipt: next_next_receipt, .. }) => { - let mut info = workflow::Info::default(workflow_cid, 2); + let stored = workflow::Stored::default(Pointer::new(workflow_cid), 2); + let mut info = workflow::Info::default(stored); info.increment_progress(next_next_receipt.cid()); assert_ne!(next_next_receipt, next_receipt); @@ -667,7 +461,7 @@ mod test { assert!(rx.recv().await.is_none()); let mut conn = db.conn().unwrap(); - let workflow_info = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); + let (_, workflow_info) = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); assert_eq!(workflow_info.num_tasks, 2); assert_eq!(workflow_info.cid, workflow_cid); @@ -732,6 +526,7 @@ mod test { let _ = MemoryDb::store_workflow( workflow::Stored::new_with_resources( Pointer::new(workflow_cid), + None, builder.workflow_len() as i32, IndexedResources::new(index_map), ), @@ -740,20 +535,9 @@ mod test { let _ = MemoryDb::commit_receipt(workflow_cid, receipt.clone(), &mut conn).unwrap(); let worker = builder.build().await; - let info = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); + let (_, info) = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); assert_eq!(Arc::new(info), worker.workflow_info); - - assert_eq!(worker.scheduler.linkmap.read().await.len(), 1); - assert!(worker - .scheduler - .linkmap - .read() - .await - .contains_key(&instruction1.to_cid().unwrap())); - assert_eq!(worker.scheduler.ran.as_ref().unwrap().len(), 1); - assert_eq!(worker.scheduler.run.len(), 1); - assert_eq!(worker.scheduler.resume_step, Some(1)); assert_eq!(worker.workflow_info.cid, workflow_cid); assert_eq!(worker.workflow_info.num_tasks, 2); assert_eq!(worker.workflow_info.resources.len(), 2); @@ -782,7 +566,8 @@ mod test { receipt: next_receipt, .. }) => { - let mut info = workflow::Info::default(workflow_cid, 2); + let stored = workflow::Stored::default(Pointer::new(workflow_cid), 2); + let mut info = workflow::Info::default(stored); info.increment_progress(next_receipt.cid()); assert_ne!(next_receipt, receipt); @@ -795,7 +580,7 @@ mod test { assert!(rx.recv().await.is_none()); let mut conn = db.conn().unwrap(); - let workflow_info = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); + let (_, workflow_info) = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); assert_eq!(workflow_info.num_tasks, 2); assert_eq!(workflow_info.cid, workflow_cid); @@ -872,6 +657,7 @@ mod test { let _ = MemoryDb::store_workflow( workflow::Stored::new_with_resources( Pointer::new(workflow_cid), + None, builder.workflow_len() as i32, IndexedResources::new(index_map), ), @@ -887,22 +673,6 @@ mod test { let worker = builder.build().await; - assert_eq!(worker.scheduler.linkmap.read().await.len(), 1); - assert!(!worker - .scheduler - .linkmap - .read() - .await - .contains_key(&instruction1.to_cid().unwrap())); - assert!(worker - .scheduler - .linkmap - .read() - .await - .contains_key(&instruction2.to_cid().unwrap())); - assert_eq!(worker.scheduler.ran.as_ref().unwrap().len(), 2); - assert!(worker.scheduler.run.is_empty()); - assert_eq!(worker.scheduler.resume_step, None); assert_eq!(worker.workflow_info.cid, workflow_cid); assert_eq!(worker.workflow_info.num_tasks, 2); assert_eq!(worker.workflow_info.resources.len(), 2); @@ -917,7 +687,7 @@ mod test { ); let mut conn = db.conn().unwrap(); - let workflow_info = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); + let (_, workflow_info) = MemoryDb::get_workflow_info(workflow_cid, &mut conn).unwrap(); assert_eq!(workflow_info.num_tasks, 2); assert_eq!(workflow_info.cid, workflow_cid); diff --git a/homestar-runtime/src/workflow.rs b/homestar-runtime/src/workflow.rs index 8e467560..62f0ad39 100644 --- a/homestar-runtime/src/workflow.rs +++ b/homestar-runtime/src/workflow.rs @@ -152,43 +152,48 @@ impl<'a> Builder<'a> { fn aot(self) -> anyhow::Result> { let lookup_table = self.lookup_table()?; - let (dag, resources) = - self.into_inner().tasks().into_iter().enumerate().try_fold( - (Dag::default(), IndexMap::new()), - |(mut dag, mut resources), (i, task)| { - let instr_cid = task.instruction_cid()?; - // Clone as we're owning the struct going backward. - let ptr: Pointer = Invocation::::from(task.clone()).try_into()?; - - let RunInstruction::Expanded(instr) = task.into_instruction() else { - bail!("workflow tasks/instructions must be expanded / inlined") - }; + let (dag, resources) = self.into_inner().tasks().into_iter().enumerate().try_fold( + (Dag::default(), IndexMap::new()), + |(mut dag, mut resources), (i, task)| { + let instr_cid = task.instruction_cid()?; + // Clone as we're owning the struct going backward. + let ptr: Pointer = Invocation::::from(task.clone()).try_into()?; + + let RunInstruction::Expanded(instr) = task.into_instruction() else { + bail!("workflow tasks/instructions must be expanded / inlined") + }; + resources + .entry(instr_cid) + .or_insert_with(|| Resource::Url(instr.resource().to_owned())); + + let parsed = instr.input().parse()?; + let reads = parsed + .args() + .deferreds() + .fold(vec![], |mut in_flow_reads, cid| { + if let Some(v) = lookup_table.get(&cid) { + in_flow_reads.push(*v) + } + // TODO: else, it's a Promise from another task outside + // of the workflow. + in_flow_reads + }); + + parsed.args().links().for_each(|cid| { resources .entry(instr_cid) - .or_insert_with(|| Resource::Url(instr.resource().to_owned())); - - let parsed = instr.input().parse()?; - let reads = parsed.args().deferreds().into_iter().fold( - vec![], - |mut in_flow_reads, cid| { - if let Some(v) = lookup_table.get(&cid) { - in_flow_reads.push(*v) - } - // TODO: else, it's a CID from another task outside - // of the workflow. - in_flow_reads - }, - ); - - let node = Node::new(Vertex::new(instr.to_owned(), parsed, ptr)) - .with_name(instr_cid.to_string()) - .with_result(i); - - dag.add_node(node.with_reads(reads)); - Ok::<_, anyhow::Error>((dag, resources)) - }, - )?; + .or_insert_with(|| Resource::Cid(cid.to_owned())); + }); + + let node = Node::new(Vertex::new(instr.to_owned(), parsed, ptr)) + .with_name(instr_cid.to_string()) + .with_result(i); + + dag.add_node(node.with_reads(reads)); + Ok::<_, anyhow::Error>((dag, resources)) + }, + )?; Ok(AOTContext { dag, diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 61c8d498..c040219a 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -18,6 +18,7 @@ use libipld::{cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Cid, Ipld}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeMap, + fmt, sync::Arc, time::{Duration, Instant}, }; @@ -42,6 +43,7 @@ const RESOURCES_KEY: &str = "resources"; #[diesel(table_name = crate::db::schema::workflows, primary_key(cid))] pub struct Stored { pub(crate) cid: Pointer, + pub(crate) name: Option, pub(crate) num_tasks: i32, pub(crate) resources: IndexedResources, pub(crate) created_at: NaiveDateTime, @@ -54,12 +56,14 @@ impl Stored { /// [db]: Database pub fn new( cid: Pointer, + name: Option, num_tasks: i32, resources: IndexedResources, created_at: NaiveDateTime, ) -> Self { Self { cid, + name, num_tasks, resources, created_at, @@ -70,9 +74,15 @@ impl Stored { /// Create a new [Stored] workflow for the [db] with a default timestamp. /// /// [db]: Database - pub fn new_with_resources(cid: Pointer, num_tasks: i32, resources: IndexedResources) -> Self { + pub fn new_with_resources( + cid: Pointer, + name: Option, + num_tasks: i32, + resources: IndexedResources, + ) -> Self { Self { cid, + name, num_tasks, resources, created_at: Utc::now().naive_utc(), @@ -84,8 +94,10 @@ impl Stored { /// /// [db]: Database pub fn default(cid: Pointer, num_tasks: i32) -> Self { + let name = cid.to_string(); Self { cid, + name: Some(name), num_tasks, resources: IndexedResources::default(), created_at: Utc::now().naive_utc(), @@ -132,40 +144,40 @@ pub struct Info { pub(crate) resources: IndexedResources, } +impl fmt::Display for Info { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "cid: {}, progress: {}/{}", + self.cid, self.progress_count, self.num_tasks + ) + } +} + impl Info { - /// Create a new workflow set of [Info] given a [Cid], progress / step, - /// [IndexedResources], and number of tasks. - pub fn new(cid: Cid, num_tasks: u32, progress: Vec, resources: IndexedResources) -> Self { + /// Create a workflow information structure from a [Stored] workflow and + /// `progress` vector. + pub fn new(stored: Stored, progress: Vec) -> Self { let progress_count = progress.len() as u32; + let cid = stored.cid.cid(); Self { cid, - num_tasks, + num_tasks: stored.num_tasks as u32, progress, progress_count, - resources, + resources: stored.resources, } } /// Create a default workflow [Info] given a [Cid] and number of tasks. - pub fn default(cid: Cid, num_tasks: u32) -> Self { - Self { - cid, - num_tasks, - progress: vec![], - progress_count: 0, - resources: IndexedResources::default(), - } - } - - /// Create a default workflow [Info] given a [Cid], number of tasks, - /// and [IndexedResources]. - pub fn default_with_resources(cid: Cid, num_tasks: u32, resources: IndexedResources) -> Self { + pub fn default(stored: Stored) -> Self { + let cid = stored.cid.cid(); Self { cid, - num_tasks, + num_tasks: stored.num_tasks as u32, progress: vec![], progress_count: 0, - resources, + resources: stored.resources, } } @@ -240,6 +252,7 @@ impl Info { pub(crate) async fn init( workflow_cid: Cid, workflow_len: u32, + name: String, resources: IndexedResources, p2p_timeout: Duration, event_sender: Arc>, @@ -247,7 +260,11 @@ impl Info { ) -> Result<(Self, NaiveDateTime)> { let timestamp = Utc::now().naive_utc(); match Db::get_workflow_info(workflow_cid, &mut conn) { - Ok(info) => Ok((info, timestamp)), + Ok((Some(stored_name), info)) if stored_name != name => { + Db::update_local_name(name, &mut conn)?; + Ok((info, timestamp)) + } + Ok((_, info)) => Ok((info, timestamp)), Err(_err) => { info!( cid = workflow_cid.to_string(), @@ -256,6 +273,7 @@ impl Info { let result = Db::store_workflow( Stored::new( Pointer::new(workflow_cid), + Some(name), workflow_len as i32, resources, timestamp, @@ -263,8 +281,7 @@ impl Info { &mut conn, )?; - let workflow_info = - Self::default_with_resources(workflow_cid, workflow_len, result.resources); + let workflow_info = Self::default(result); // spawn a task to retrieve the workflow info from the // network and store it in the database if it finds it. @@ -295,7 +312,7 @@ impl Info { .as_mut() .and_then(|conn| Db::get_workflow_info(workflow_cid, conn).ok()) { - Some(workflow_info) => Ok(workflow_info), + Some((_name, workflow_info)) => Ok(workflow_info), None => { info!( cid = workflow_cid.to_string(), @@ -463,7 +480,11 @@ mod test { ); let workflow = Workflow::new(vec![task1.clone(), task2.clone()]); - let mut workflow_info = Info::default(workflow.clone().to_cid().unwrap(), workflow.len()); + let stored_info = Stored::default( + Pointer::new(workflow.clone().to_cid().unwrap()), + workflow.len() as i32, + ); + let mut workflow_info = Info::default(stored_info); workflow_info.increment_progress(task1.to_cid().unwrap()); workflow_info.increment_progress(task2.to_cid().unwrap()); let ipld = Ipld::from(workflow_info.clone()); diff --git a/homestar-runtime/src/workflow/settings.rs b/homestar-runtime/src/workflow/settings.rs index 4f6da295..270517bc 100644 --- a/homestar-runtime/src/workflow/settings.rs +++ b/homestar-runtime/src/workflow/settings.rs @@ -8,7 +8,6 @@ use std::time::Duration; #[derive(Debug, Clone, PartialEq)] pub struct Settings { pub(crate) retries: u32, - pub(crate) retry_backoff_strategy: BackoffStrategy, pub(crate) retry_max_delay: Duration, pub(crate) retry_initial_delay: Duration, pub(crate) p2p_check_timeout: Duration, @@ -21,7 +20,6 @@ impl Default for Settings { fn default() -> Self { Self { retries: 10, - retry_backoff_strategy: BackoffStrategy::Exponential, retry_max_delay: Duration::new(60, 0), retry_initial_delay: Duration::from_millis(500), p2p_check_timeout: Duration::new(5, 0), @@ -36,7 +34,6 @@ impl Default for Settings { fn default() -> Self { Self { retries: 1, - retry_backoff_strategy: BackoffStrategy::Exponential, retry_max_delay: Duration::new(1, 0), retry_initial_delay: Duration::from_millis(50), p2p_check_timeout: Duration::from_millis(10), @@ -45,17 +42,3 @@ impl Default for Settings { } } } - -/// Backoff strategies supported for workflows. -#[allow(dead_code)] -#[derive(Debug, Clone, PartialEq)] -pub(crate) enum BackoffStrategy { - /// Exponential backoff: the delay will double each time. - Exponential, - /// Fixed backoff: the delay wont change between attempts. - Fixed, - /// Linear backoff: the delay will scale linearly with the number of attempts. - Linear, - /// No backoff: forcing just leveraging retries. - None, -} diff --git a/homestar-wasm/src/wasmtime/world.rs b/homestar-wasm/src/wasmtime/world.rs index 41177265..b2996deb 100644 --- a/homestar-wasm/src/wasmtime/world.rs +++ b/homestar-wasm/src/wasmtime/world.rs @@ -26,9 +26,6 @@ use wit_component::ComponentEncoder; // One unit of fuel represents around 100k instructions. const UNIT_OF_COMPUTE_INSTRUCTIONS: u64 = 100_000; -// TODO: Implement errors over thiserror and bubble up traps from here to -// our error set. - /// Incoming `state` from host runtime. #[allow(missing_debug_implementations)] pub struct State {