Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prep for first example application with various fixes and changes. #235

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,17 @@ homestar.pid

# locks
homestar-wasm/Cargo.lock

# examples-npm
node_modules
examples/**/**/build
examples/**/**.svelte-kit
examples/**/**/package
examples/**/**/.env.*
examples/**/**/vite.config.js.timestamp-*
examples/**/**/vite.config.ts.timestamp-*

!examples/**/**/.env.example

# temp
examples/websocket-relay
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[workspace]
members = [
# "examples/*",
"homestar-core",
"homestar-functions",
"homestar-runtime",
"homestar-wasm"
]
"homestar-wasm"]
resolver = "2"

[workspace.package]
Expand Down
Empty file added examples/.gitkeep
Empty file.
20 changes: 15 additions & 5 deletions homestar-core/src/workflow/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ where
}

/// Return *only* deferred/awaited inputs.
pub fn deferreds(&self) -> Vec<Cid> {
self.0.iter().fold(vec![], |mut acc, input| {
pub fn deferreds(&self) -> impl Iterator<Item = Cid> + '_ {
self.0.iter().filter_map(|input| {
if let Input::Deferred(awaited_promise) = input {
acc.push(awaited_promise.instruction_cid());
acc
Some(awaited_promise.instruction_cid())
} else {
acc
None
}
})
}

/// Return *only* [Ipld::Link] [Cid]s.
pub fn links(&self) -> impl Iterator<Item = Cid> + '_ {
self.0.iter().filter_map(|input| {
if let Input::Ipld(Ipld::Link(link)) = input {
Some(link.to_owned())
} else {
None
}
})
}
Expand Down
10 changes: 10 additions & 0 deletions homestar-core/src/workflow/receipt/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@
/// Metadata key for an operation or function name.
pub const OP_KEY: &str = "op";

/// Metadata attributed to a boolean true/false value on whether
/// the computation was executed from scratch or not.
pub const REPLAYED_KEY: &str = "replayed";

/// Metadata key for a workflow [Cid].
///
/// [Cid]: libipld::Cid
pub const WORKFLOW_KEY: &str = "workflow";

/// Associated metadata key for a workflow name, which
/// will either be some identifier, or the [Cid] of the workflow.
///
/// [Cid]: libipld::Cid
pub const WORKFLOW_NAME_KEY: &str = "name";
5 changes: 3 additions & 2 deletions homestar-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ sec1 = { version = "0.7", features = ["pem", "der"] }
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_ipld_dagcbor = { workspace = true }
serde_json = { version = "1.0", optional = true }
serde_with = { version = "3.2", features = ["base64"] }
stream-cancel = "0.8"
strum = { version = "0.25", features = ["derive"] }
Expand All @@ -98,7 +99,6 @@ assert_cmd = "2.0"
criterion = "0.5"
homestar-core = { version = "0.1", path = "../homestar-core", features = [ "test-utils" ] }
homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "homestar-runtime-tests-proc-macro" }
json = "0.12"
nix = "0.26"
once_cell = "1.18"
predicates = "3.0"
Expand All @@ -110,11 +110,12 @@ tokio-tungstenite = "0.20"
wait-timeout = "0.2"

[features]
default = ["ipfs", "websocket-server"]
default = ["ipfs", "websocket-notify"]
console = ["dep:console-subscriber"]
ipfs = ["dep:ipfs-api", "dep:ipfs-api-backend-hyper"]
profile = ["dep:puffin", "dep:puffin_egui"]
test-utils = ["dep:proptest"]
websocket-notify = ["websocket-server", "dep:serde_json"]
websocket-server = ["dep:axum"]

[package.metadata.docs.rs]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
CREATE TABLE workflows (
cid TEXT NOT NULL PRIMARY KEY,
name TEXT,
num_tasks INTEGER NOT NULL,
resources BLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
Expand Down
15 changes: 12 additions & 3 deletions homestar-runtime/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ pub enum Command {
/// RPC host / port arguments.
#[clap(flatten)]
args: RpcArgs,
/// (optional) name of workflow.
#[arg(
short = 'n',
long = "name",
value_name = "NAME",
help = "(optional) name given to a workflow"
)]
name: Option<String>,
/// Workflow file to run.
#[arg(
short='w',
long = "workflow",
Expand All @@ -113,7 +122,6 @@ pub enum Command {
value_parser = clap::value_parser!(file::ReadWorkflow),
help = "path to workflow file"
)]
/// Workflow file to run.
workflow: file::ReadWorkflow,
},
}
Expand All @@ -129,7 +137,7 @@ impl Command {
}

/// Handle CLI commands related to [Client] RPC calls.
pub fn handle_rpc_command(&self) -> Result<(), Error> {
pub fn handle_rpc_command(self) -> Result<(), Error> {
// Spin up a new tokio runtime on the current thread.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -154,11 +162,12 @@ impl Command {
}),
Command::Run {
args,
name,
workflow: workflow_file,
} => {
let response = rt.block_on(async {
let client = args.client().await?;
let response = client.run(workflow_file.to_owned()).await??;
let response = client.run(name, workflow_file).await??;
Ok::<response::AckWorkflow, Error>(response)
})?;

Expand Down
46 changes: 33 additions & 13 deletions homestar-runtime/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use anyhow::Result;
use byte_unit::{AdjustedByte, Byte, ByteUnit};
use diesel::{
dsl::now,
prelude::*,
r2d2::{self, CustomizeConnection, ManageConnection},
BelongingToDsl, Connection as SingleConnection, RunQueryDsl, SqliteConnection,
Expand Down Expand Up @@ -69,7 +70,7 @@ impl Db {
/// Database trait for working with different Sqlite connection pool and
/// connection configurations.
pub trait Database: Send + Sync + Clone {
/// Get database url.
/// Set database url.
///
/// Contains a minimal side-effect to set the env if not already set.
fn set_url(database_url: Option<String>) -> Option<String> {
Expand All @@ -82,17 +83,25 @@ pub trait Database: Send + Sync + Clone {
)
}

/// Get database url.
fn url() -> Result<String> {
Ok(env::var(ENV)?)
}

/// Test a Sqlite connection to the database and run pending migrations.
fn setup(url: &str) -> Result<SqliteConnection> {
info!("Using database at {:?}", url);
info!("using database at {}", url);
let mut connection = SqliteConnection::establish(url)?;
let _ = connection.run_pending_migrations(MIGRATIONS);

Ok(connection)
}

/// Establish a pooled connection to Sqlite database.
fn setup_connection_pool(settings: &settings::Node) -> Result<Self>
fn setup_connection_pool(
settings: &settings::Node,
database_url: Option<String>,
) -> Result<Self>
where
Self: Sized;
/// Get a pooled connection for the database.
Expand Down Expand Up @@ -226,7 +235,7 @@ pub trait Database: Send + Sync + Clone {
fn get_workflow_info(
workflow_cid: Cid,
conn: &mut Connection,
) -> Result<workflow::Info, diesel::result::Error> {
) -> Result<(Option<String>, workflow::Info), diesel::result::Error> {
let workflow = Self::select_workflow(workflow_cid, conn)?;
let associated_receipts = workflow::StoredReceipt::belonging_to(&workflow)
.select(schema::workflows_receipts::receipt_cid)
Expand All @@ -237,18 +246,29 @@ pub trait Database: Send + Sync + Clone {
.map(|pointer: Pointer| pointer.cid())
.collect();

Ok(workflow::Info::new(
workflow_cid,
workflow.num_tasks as u32,
cids,
workflow.resources,
))
let name = workflow.name.clone();
let info = workflow::Info::new(workflow, cids);

Ok((name, info))
}

/// Update the local (view) name of a workflow.
fn update_local_name(name: String, conn: &mut Connection) -> Result<(), diesel::result::Error> {
diesel::update(schema::workflows::dsl::workflows)
.filter(schema::workflows::created_at.lt(now))
.set(schema::workflows::name.eq(&name))
.execute(conn)?;

Ok(())
}
}

impl Database for Db {
fn setup_connection_pool(settings: &settings::Node) -> Result<Self> {
let database_url = env::var(ENV).unwrap_or_else(|_| {
fn setup_connection_pool(
settings: &settings::Node,
database_url: Option<String>,
) -> Result<Self> {
let database_url = Self::set_url(database_url).unwrap_or_else(|| {
settings
.db
.url
Expand Down Expand Up @@ -305,7 +325,7 @@ mod test {
fn check_pragmas_memory_db() {
let settings = TestSettings::load();

let db = MemoryDb::setup_connection_pool(settings.node()).unwrap();
let db = MemoryDb::setup_connection_pool(settings.node(), None).unwrap();
let mut conn = db.conn().unwrap();

let journal_mode = diesel::dsl::sql::<diesel::sql_types::Text>("PRAGMA journal_mode")
Expand Down
3 changes: 2 additions & 1 deletion homestar-runtime/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ diesel::table! {
diesel::table! {
workflows (cid) {
cid -> Text,
name -> Nullable<Text>,
num_tasks -> Integer,
resources -> Binary,
created_at -> Timestamp,
Expand All @@ -33,4 +34,4 @@ diesel::table! {
diesel::joinable!(workflows_receipts -> receipts (receipt_cid));
diesel::joinable!(workflows_receipts -> workflows (workflow_cid));

diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts,);
diesel::allow_tables_to_appear_in_same_query!(receipts, workflows, workflows_receipts);
59 changes: 59 additions & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! [EventHandler] implementation for handling network events and messages.

#[cfg(feature = "websocket-server")]
use crate::network::ws;
#[cfg(feature = "ipfs")]
use crate::network::IpfsCli;
use crate::{
Expand Down Expand Up @@ -37,6 +39,27 @@ where
}

/// Event loop handler for [libp2p] network events and commands.
#[cfg(feature = "websocket-server")]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "websocket-server", feature = "websocket-notify")))
)]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
workflow_quorum: usize,
p2p_provider_timeout: Duration,
db: DB,
swarm: Swarm<ComposedBehaviour>,
sender: Arc<mpsc::Sender<Event>>,
receiver: mpsc::Receiver<Event>,
query_senders: FnvHashMap<QueryId, (RequestResponseKey, P2PSender)>,
request_response_senders: FnvHashMap<RequestId, (RequestResponseKey, P2PSender)>,
ws_msg_sender: ws::Notifier,
}

/// Event loop handler for [libp2p] network events and commands.
#[cfg(not(feature = "websocket-server"))]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
receipt_quorum: usize,
Expand All @@ -59,6 +82,30 @@ where
}

/// Create an [EventHandler] with channel sender/receiver defaults.
#[cfg(feature = "websocket-server")]
pub(crate) fn new(
swarm: Swarm<ComposedBehaviour>,
db: DB,
settings: &settings::Node,
ws_msg_sender: ws::Notifier,
) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
Self {
receipt_quorum: settings.network.receipt_quorum,
workflow_quorum: settings.network.workflow_quorum,
p2p_provider_timeout: settings.network.p2p_provider_timeout,
db,
swarm,
sender: Arc::new(sender),
receiver,
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
ws_msg_sender,
}
}

/// Create an [EventHandler] with channel sender/receiver defaults.
#[cfg(not(feature = "websocket-server"))]
pub(crate) fn new(swarm: Swarm<ComposedBehaviour>, db: DB, settings: &settings::Node) -> Self {
let (sender, receiver) = Self::setup_channel(settings);
Self {
Expand All @@ -85,6 +132,18 @@ where
self.sender.clone()
}

/// [tokio::sync::broadcast::Sender] for sending messages through the
/// webSocket server to subscribers.
#[cfg(all(feature = "websocket-server", feature = "websocket-notify"))]
#[cfg_attr(
docsrs,
doc(cfg(all(feature = "websocket-server", feature = "websocket-notify")))
)]
#[allow(dead_code)]
pub(crate) fn ws_sender(&self) -> ws::Notifier {
self.ws_msg_sender.clone()
}

/// Start [EventHandler] that matches on swarm and pubsub [events].
///
/// [events]: libp2p::swarm::SwarmEvent
Expand Down
Loading
Loading