diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic index e9e3aff4e4..3a0d621fac 100644 --- a/.config/dictionaries/project.dic +++ b/.config/dictionaries/project.dic @@ -155,6 +155,7 @@ reentrancy removedir renameat reqwest +reusage rfind Rmax Rmin diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index d059844704..910e782c7a 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1703,7 +1703,7 @@ checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" name = "cron_callback" version = "0.1.0" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -2382,7 +2382,7 @@ dependencies = [ name = "failed_init" version = "0.1.0" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -3121,6 +3121,7 @@ dependencies = [ "keyed-lock", "libsqlite3-sys", "libtest-mimic", + "multihash", "num-traits", "num_cpus", "once_cell", @@ -3357,7 +3358,7 @@ version = "0.1.0" dependencies = [ "serde_json", "url", - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -3775,6 +3776,13 @@ dependencies = [ "winreg 0.50.0", ] +[[package]] +name = "ipfs_subscribe" +version = "0.1.0" +dependencies = [ + "wit-bindgen 0.48.1", +] + [[package]] name = "ipld-core" version = "0.4.2" @@ -7628,7 +7636,7 @@ dependencies = [ "url", "utoipa", "uuid", - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -7695,7 +7703,7 @@ version = "0.1.0" dependencies = [ "serde_json", "url", - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -9009,7 +9017,7 @@ version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ - "wit-bindgen", + "wit-bindgen 0.46.0", ] [[package]] @@ -9102,6 +9110,18 @@ dependencies = [ "wasmparser 0.239.0", ] +[[package]] +name = "wasm-metadata" +version = "0.241.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876fe286f2fa416386deedebe8407e6f19e0b5aeaef3d03161e77a15fa80f167" +dependencies = [ + "anyhow", + "indexmap 2.12.1", + "wasm-encoder 0.241.2", + "wasmparser 0.241.2", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -9146,6 +9166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46d90019b1afd4b808c263e428de644f3003691f243387d30d673211ee0cb8e8" dependencies = [ "bitflags 2.10.0", + "hashbrown 0.15.5", "indexmap 2.12.1", "semver", ] @@ -9257,7 +9278,7 @@ dependencies = [ "syn 2.0.111", "wasmtime-internal-component-util", "wasmtime-internal-wit-bindgen", - "wit-parser", + "wit-parser 0.239.0", ] [[package]] @@ -9398,7 +9419,7 @@ dependencies = [ "bitflags 2.10.0", "heck 0.5.0", "indexmap 2.12.1", - "wit-parser", + "wit-parser 0.239.0", ] [[package]] @@ -9991,7 +10012,17 @@ dependencies = [ "bitflags 2.10.0", "futures", "once_cell", - "wit-bindgen-rust-macro", + "wit-bindgen-rust-macro 0.46.0", +] + +[[package]] +name = "wit-bindgen" +version = "0.48.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f8c2adb5f74ac9395bc3121c99a1254bf9310482c27b13f97167aedb5887138" +dependencies = [ + "bitflags 2.10.0", + "wit-bindgen-rust-macro 0.48.1", ] [[package]] @@ -10002,7 +10033,18 @@ checksum = "cabd629f94da277abc739c71353397046401518efb2c707669f805205f0b9890" dependencies = [ "anyhow", "heck 0.5.0", - "wit-parser", + "wit-parser 0.239.0", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.48.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b881a098cae03686d7a0587f8f306f8a58102ad8da8b5599100fbe0e7f5800b" +dependencies = [ + "anyhow", + "heck 0.5.0", + "wit-parser 0.241.2", ] [[package]] @@ -10016,9 +10058,25 @@ dependencies = [ "indexmap 2.12.1", "prettyplease", "syn 2.0.111", - "wasm-metadata", - "wit-bindgen-core", - "wit-component", + "wasm-metadata 0.239.0", + "wit-bindgen-core 0.46.0", + "wit-component 0.239.0", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.48.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69667efa439a453e1d50dac939c6cab6d2c3ac724a9d232b6631dad2472a5b70" +dependencies = [ + "anyhow", + "heck 0.5.0", + "indexmap 2.12.1", + "prettyplease", + "syn 2.0.111", + "wasm-metadata 0.241.2", + "wit-bindgen-core 0.48.1", + "wit-component 0.241.2", ] [[package]] @@ -10032,8 +10090,23 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.111", - "wit-bindgen-core", - "wit-bindgen-rust", + "wit-bindgen-core 0.46.0", + "wit-bindgen-rust 0.46.0", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.48.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae2e22cceb5d105d52326c07e3e67603a861cc7add70fc467f7cc7ec5265017" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.111", + "wit-bindgen-core 0.48.1", + "wit-bindgen-rust 0.48.1", ] [[package]] @@ -10050,9 +10123,28 @@ dependencies = [ "serde_derive", "serde_json", "wasm-encoder 0.239.0", - "wasm-metadata", + "wasm-metadata 0.239.0", "wasmparser 0.239.0", - "wit-parser", + "wit-parser 0.239.0", +] + +[[package]] +name = "wit-component" +version = "0.241.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0c57df25e7ee612d946d3b7646c1ddb2310f8280aa2c17e543b66e0812241" +dependencies = [ + "anyhow", + "bitflags 2.10.0", + "indexmap 2.12.1", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder 0.241.2", + "wasm-metadata 0.241.2", + "wasmparser 0.241.2", + "wit-parser 0.241.2", ] [[package]] @@ -10073,6 +10165,24 @@ dependencies = [ "wasmparser 0.239.0", ] +[[package]] +name = "wit-parser" +version = "0.241.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ef1c6ad67f35c831abd4039c02894de97034100899614d1c44e2268ad01c91" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.12.1", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser 0.241.2", +] + [[package]] name = "witx" version = "0.9.1" diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index c04e97fe93..80de811919 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -6,7 +6,8 @@ members = [ "bin/tests/integration/components/failed_init", "bin/tests/integration/components/sleep_component", "bin/tests/integration/components/cron_callback", - "bin/tests/integration/components/staked_ada_indexer_mock" + "bin/tests/integration/components/staked_ada_indexer_mock", + "bin/tests/integration/components/ipfs_subscribe", ] default-members = [ "bin", diff --git a/hermes/bin/Cargo.toml b/hermes/bin/Cargo.toml index 68da26e9ca..3ea2cdc257 100644 --- a/hermes/bin/Cargo.toml +++ b/hermes/bin/Cargo.toml @@ -102,6 +102,7 @@ orx-concurrent-vec = "3.10.0" keyed-lock = "0.2.3" wasmprinter = "0.240.0" wat = "1.239.0" +multihash = { version = "0.19.3", features = ["serde-codec"] } [build-dependencies] build-info-build = "0.0.42" diff --git a/hermes/bin/src/ipfs/api.rs b/hermes/bin/src/ipfs/api.rs index 2dac24e740..26c4c3edd5 100644 --- a/hermes/bin/src/ipfs/api.rs +++ b/hermes/bin/src/ipfs/api.rs @@ -118,7 +118,7 @@ pub(crate) fn hermes_ipfs_subscribe( } else { let handle = ipfs.pubsub_subscribe(&topic)?; ipfs.apps.added_topic_stream(topic.clone(), handle); - tracing::debug!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream"); + tracing::info!(app_name = %app_name, pubsub_topic = %topic, "added subscription topic stream"); } ipfs.apps .added_app_topic_subscription(app_name.clone(), topic); diff --git a/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs b/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs index 74ba03f20c..cedce3b55a 100644 --- a/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs +++ b/hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs @@ -1,22 +1,79 @@ //! Doc Sync host module. +use cardano_chain_follower::pallas_codec::minicbor::{self, Encode, Encoder, data::Tag}; +use stringzilla::stringzilla::Sha256; use wasmtime::component::Resource; use crate::{ + ipfs::hermes_ipfs_subscribe, runtime_context::HermesRuntimeContext, - runtime_extensions::bindings::hermes::doc_sync::api::{ - ChannelName, DocData, DocLoc, DocProof, Errno, Host, HostSyncChannel, ProverId, SyncChannel, + runtime_extensions::{ + bindings::hermes::doc_sync::api::{ + ChannelName, DocData, DocLoc, DocProof, Errno, Host, HostSyncChannel, ProverId, + SyncChannel, + }, + hermes::doc_sync::DOC_SYNC_STATE, }, }; +/// CBOR multicodec identifier. +/// +/// See: +const CBOR_CODEC: u64 = 0x51; + +/// SHA2-256 multihash code. +const SHA2_256_CODE: u64 = 0x12; + +/// CBOR tag for IPLD CID (Content Identifier). +/// +/// See: +const CID_CBOR_TAG: u64 = 42; + +/// Wrapper for `hermes_ipfs::Cid` to implement `minicbor::Encode` for it. +struct Cid(hermes_ipfs::Cid); + +impl minicbor::Encode<()> for Cid { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), minicbor::encode::Error> { + // Encode as tag(42) containing the CID bytes + e.tag(Tag::new(CID_CBOR_TAG))?; + e.bytes(&self.0.to_bytes())?; + Ok(()) + } +} + #[allow(clippy::todo)] impl Host for HermesRuntimeContext { /// Get the Document ID for the given Binary Document + /// + /// See: + /// + /// # Note + /// + /// We expect to receive doc as cbor bytes. fn id_for( &mut self, - _doc: DocData, + doc: DocData, ) -> wasmtime::Result> { - todo!() + // Compute SHA2-256 hash + let mut hasher = Sha256::new(); + hasher.update(&doc); + let hash_digest = hasher.digest(); + + // Create multihash from digest using the wrap() API + // The generic parameter <64> is the max digest size we support + let multihash = multihash::Multihash::<64>::wrap(SHA2_256_CODE, &hash_digest)?; + + // Create CID v1 with CBOR codec + let cid = hermes_ipfs::Cid::new_v1(CBOR_CODEC, multihash); + + let mut e = minicbor::Encoder::new(Vec::new()); + Cid(cid).encode(&mut e, &mut ())?; + + Ok(e.into_writer()) } } @@ -35,9 +92,43 @@ impl HostSyncChannel for HermesRuntimeContext { /// - `error(create-network-error)`: If creating network resource failed. fn new( &mut self, - _name: ChannelName, + name: ChannelName, ) -> wasmtime::Result> { - todo!() + let hash = blake2b_simd::Params::new() + .hash_length(4) + .hash(name.as_bytes()); + + // The digest is a 64-byte array ([u8; 64]) for 512-bit output. + // Take the first 4 bytes to use them as resource id. + // + // Assumption: + // Number of channels is way more less then u32, so collisions are + // acceptable but unlikely in practice. We use the first 4 bytes of + // the cryptographically secure Blake2b hash as a fast, 32-bit ID + // to minimize lock contention when accessing state via DOC_SYNC_STATE. + let prefix_bytes: &[u8; 4] = hash.as_bytes().try_into().map_err(|err| { + wasmtime::Error::msg(format!("BLAKE2b hash output length must be 4 bytes: {err}")) + })?; + + let resource: u32 = u32::from_be_bytes(*prefix_bytes); + + // Code block is used to minimize locking scope. + { + let entry = DOC_SYNC_STATE.entry(resource).or_insert(name.clone()); + if &name != entry.value() { + return Err(wasmtime::Error::msg(format!( + "Collision occurred with previous value = {} and new one = {name}", + entry.value() + ))); + } + } + + if let Err(err) = hermes_ipfs_subscribe(self.app_name(), name) { + DOC_SYNC_STATE.remove(&resource); + return Err(wasmtime::Error::msg(format!("Subscription failed: {err}",))); + } + + Ok(wasmtime::component::Resource::new_own(resource)) } /// Close Doc Sync Channel @@ -46,7 +137,7 @@ impl HostSyncChannel for HermesRuntimeContext { /// (and all docs stored are released) /// Close itself should be deferred until all running WASM modules with an open /// `sync-channel` resource have terminated. - /// + /// /// **Parameters** /// /// None @@ -57,14 +148,13 @@ impl HostSyncChannel for HermesRuntimeContext { /// - `error()`: If it gets an error closing. fn close( &mut self, - _self_: Resource, - _name: ChannelName, + self_: Resource, ) -> wasmtime::Result> { - todo!() + inner_close(self, self_) } /// Post the document to a channel - /// + /// /// **Parameters** /// /// None @@ -82,7 +172,7 @@ impl HostSyncChannel for HermesRuntimeContext { } /// Prove a document is stored in the provers - /// + /// /// **Parameters** /// /// loc : Location ID of the document to prove storage of. @@ -104,7 +194,7 @@ impl HostSyncChannel for HermesRuntimeContext { } /// Disprove a document is stored in the provers - /// + /// /// **Parameters** /// /// loc : Location ID of the document to prove storage of. @@ -126,7 +216,7 @@ impl HostSyncChannel for HermesRuntimeContext { } /// Prove a document is stored in the provers - /// + /// /// **Parameters** /// /// None @@ -146,8 +236,22 @@ impl HostSyncChannel for HermesRuntimeContext { /// Wasmtime resource drop callback. fn drop( &mut self, - _rep: Resource, + res: Resource, ) -> wasmtime::Result<()> { - todo!() + inner_close(self, res)??; + + Ok(()) } } + +/// This function is required cause reusage of `self.close` +/// inside drop causes invalid behavior during codegen. +#[allow(clippy::unnecessary_wraps)] +fn inner_close( + _ctx: &mut HermesRuntimeContext, + _res: Resource, +) -> wasmtime::Result> { + // TODO(anyone): Here we should clean up the state, since we would have a map that + // associates app_name with app's subscriptions. + Ok(Ok(true)) +} diff --git a/hermes/bin/src/runtime_extensions/hermes/doc_sync/mod.rs b/hermes/bin/src/runtime_extensions/hermes/doc_sync/mod.rs index 866c17c60d..f422e60de9 100644 --- a/hermes/bin/src/runtime_extensions/hermes/doc_sync/mod.rs +++ b/hermes/bin/src/runtime_extensions/hermes/doc_sync/mod.rs @@ -1,4 +1,18 @@ //! Doc Sync extension implementation. +use dashmap::DashMap; +use once_cell::sync::Lazy; + mod event; mod host; + +/// Initialize state. Which is mapping from String hash to String itself. +/// +/// Note: +/// +/// If large amount of sync channels is expected it would lead to great +/// amount of collision, so should be more strictly stored. +pub(super) type State = DashMap; + +/// Global state to hold the resources. +static DOC_SYNC_STATE: Lazy = Lazy::new(DashMap::new); diff --git a/hermes/bin/tests/integration/components/ipfs_subscribe/Cargo.toml b/hermes/bin/tests/integration/components/ipfs_subscribe/Cargo.toml new file mode 100644 index 0000000000..7c7c8873fa --- /dev/null +++ b/hermes/bin/tests/integration/components/ipfs_subscribe/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ipfs_subscribe" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wit-bindgen = "0.48.0" + +[lints] +workspace = true \ No newline at end of file diff --git a/hermes/bin/tests/integration/components/ipfs_subscribe/src/lib.rs b/hermes/bin/tests/integration/components/ipfs_subscribe/src/lib.rs new file mode 100644 index 0000000000..f9dff7d819 --- /dev/null +++ b/hermes/bin/tests/integration/components/ipfs_subscribe/src/lib.rs @@ -0,0 +1,36 @@ +//! The test Hermes App. +#![allow( + clippy::missing_safety_doc, + clippy::missing_docs_in_private_items, + clippy::expect_used, + clippy::panic +)] + +mod bindings { + + wit_bindgen::generate!({ + world: "hermes:app/hermes", + path: "../../../../../../wasm/wasi/wit", + inline: " + package hermes:app; + + world hermes { + import hermes:doc-sync/api; + + export hermes:init/event; + } + ", + generate_all, + }); +} + +struct IPFSSubscribeApp; + +impl bindings::exports::hermes::init::event::Guest for IPFSSubscribeApp { + fn init() -> bool { + bindings::hermes::doc_sync::api::SyncChannel::new("ipfs_channel"); + false + } +} + +bindings::export!(IPFSSubscribeApp with_types_in bindings); diff --git a/hermes/bin/tests/integration/tests/serial/ipfs_subscribe.rs b/hermes/bin/tests/integration/tests/serial/ipfs_subscribe.rs new file mode 100644 index 0000000000..f056099049 --- /dev/null +++ b/hermes/bin/tests/integration/tests/serial/ipfs_subscribe.rs @@ -0,0 +1,37 @@ +use serial_test::serial; +use temp_dir::TempDir; + +use crate::utils; + +#[test] +#[serial] +fn ipfs_subscribe() { + const COMPONENT: &str = "ipfs_subscribe"; + const MODULE: &str = "ipfs_subscribe_module"; + + let temp_dir = TempDir::new().unwrap(); + utils::component::build(COMPONENT, &temp_dir).expect("failed to build component"); + + let app_file_name = + utils::packaging::package(&temp_dir, COMPONENT, MODULE).expect("failed to package app"); + + // TODO[RC]: Build hermes just once for all tests + utils::hermes::build(); + + utils::hermes::run_app(&temp_dir, &app_file_name).expect_err("should fail to run hermes app"); + + assert!(utils::assert::app_logs_contain( + &temp_dir, + "added subscription topic stream" + )); + + assert!(utils::assert::app_logs_contain( + &temp_dir, + "\"pubsub_topic\":\"ipfs_channel\"" + )); + + // Uncomment the line below if you want to inspect the details + // available in the temp directory. + // + // utils::debug_sleep(&temp_dir); +} diff --git a/hermes/bin/tests/integration/tests/serial/mod.rs b/hermes/bin/tests/integration/tests/serial/mod.rs index 80ec9506ce..6ca34666bb 100644 --- a/hermes/bin/tests/integration/tests/serial/mod.rs +++ b/hermes/bin/tests/integration/tests/serial/mod.rs @@ -7,5 +7,6 @@ mod athena; mod cron_callback; mod failed_module_init; mod http_request_rte; +mod ipfs_subscribe; mod parallel_module_execution; mod staked_ada; diff --git a/wasm/wasi/wit/deps/hermes-doc-sync/api.wit b/wasm/wasi/wit/deps/hermes-doc-sync/api.wit index 6e3270f6e0..b5c937f9de 100644 --- a/wasm/wasi/wit/deps/hermes-doc-sync/api.wit +++ b/wasm/wasi/wit/deps/hermes-doc-sync/api.wit @@ -34,76 +34,76 @@ interface api { /// **Parameters** /// /// - `name`: The Name of the channel to Open. Creates if it doesn't exist, otherwise joins it. - /// + /// /// **Returns** - /// + /// /// - `ok(network)`: A resource network, if successfully create network resource. /// - `error(create-network-error)`: If creating network resource failed. constructor(name: channel-name); /// Close Doc Sync Channel /// - /// Can't use the sync-channel anymore after its closed + /// Can't use the sync-channel anymore after its closed /// (and all docs stored are released) /// Close itself should be deferred until all running WASM modules with an open `sync-channel` /// resource have terminated. - /// + /// /// **Parameters** /// /// None - /// + /// /// **Returns** - /// + /// /// - `ok(true)`: Channel Closed and resources released. /// - `error()`: If it gets an error closing. - close: func(name: channel-name) -> result; + close: static func(self: sync-channel) -> result; /// Post the document to a channel - /// + /// /// **Parameters** /// /// None - /// + /// /// **Returns** - /// + /// /// - `ok(true)`: Channel Closed and resources released. /// - `error()`: If it gets an error closing. post: func(doc: doc-data) -> result; /// Prove a document is stored in the provers - /// + /// /// **Parameters** /// /// loc : Location ID of the document to prove storage of. /// provers: List of provers to prove against (if empty, all provers will be requested for proof.) - /// + /// /// **Returns** - /// + /// /// - `ok(list of proofs received [prover id inside the proof])`: Document stored OK or Not based on proof. /// - `error()`: If it gets an error. prove-includes: func(loc: doc-loc, provers: list) -> result, errno>; /// Disprove a document is stored in the provers - /// + /// /// **Parameters** /// /// loc : Location ID of the document to prove storage of. /// provers: List of provers to prove against (if empty, all provers will be requested for proof.) - /// + /// /// **Returns** - /// + /// /// - `ok(list of proofs received [prover id inside the proof])`: Document stored OK or Not based on proof. /// - `error()`: If it gets an error. prove-excludes: func(loc: doc-loc, provers: list) -> result, errno>; /// Prove a document is stored in the provers - /// + /// /// **Parameters** /// /// None - /// + /// /// **Returns** - /// + /// /// - `ok(doc-data)`: Data associated with that document location, if it exists. /// - `error()`: If it gets an error. get: func(loc: doc-loc) -> result;