diff --git a/Cargo.lock b/Cargo.lock index f65cddb..22f7d0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -717,6 +717,27 @@ dependencies = [ "either", ] +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cacache" version = "11.5.2" @@ -815,6 +836,16 @@ dependencies = [ "jobserver", ] +[[package]] +name = "cdivsufsort" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edefce019197609da416762da75bb000bbd2224b2d89a7e722c2296cbff79b8c" +dependencies = [ + "cc", + "sacabase", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -2397,6 +2428,18 @@ dependencies = [ "unicase", ] +[[package]] +name = "qbsdiff" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265fc0efc3effc503295e3138ab93bbcdc2da22006b2019188248b23b0ebb9e0" +dependencies = [ + "byteorder", + "bzip2", + "rayon", + "suffix_array", +] + [[package]] name = "quote" version = "1.0.26" @@ -2677,6 +2720,15 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +[[package]] +name = "sacabase" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9883fc3d6ce3d78bb54d908602f8bc1f7b5f983afe601dabe083009d86267a84" +dependencies = [ + "num-traits", +] + [[package]] name = "same-file" version = "1.0.6" @@ -3003,6 +3055,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +[[package]] +name = "suffix_array" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907d9ca9637a22e3a7d7c7818f6105a7898857359e187ad3325d986684b9ec3f" +dependencies = [ + "cdivsufsort", +] + [[package]] name = "syn" version = "1.0.109" @@ -3466,6 +3527,7 @@ dependencies = [ "kaboodle", "log", "once_cell", + "qbsdiff", "regex", "reqwest", "serde", @@ -3477,7 +3539,6 @@ dependencies = [ "tokio-util", "toml 0.7.3", "uuid", - "wasi-common", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d0c6898..ca9cf13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ dotenvy = "0.15.6" env_logger = "0.10.0" log = "0.4.17" reqwest = { version = "0.11.13", default-features = false, features = ["brotli", "json", "rustls-tls", "stream", "native-tls-vendored"] } +qbsdiff = "1.4.0" serde = { version = "1.0.149", features = ["derive"] } serde_json = "1.0.89" ssri = "8.0.0" diff --git a/agent/src/api/v1/storage.rs b/agent/src/api/v1/storage.rs index e517666..082971d 100644 --- a/agent/src/api/v1/storage.rs +++ b/agent/src/api/v1/storage.rs @@ -2,8 +2,9 @@ use axum::body::{Body, Bytes}; use axum::extract::{Path, State}; use axum::http::{header, Request, StatusCode}; use axum::response::IntoResponse; -use axum::routing::{any, get, head, post, put}; +use axum::routing::{any, get, head, patch, post, put}; use ssri::Integrity; +use utils::diffs::apply_patch; use utils::errors::ServalError; use utils::mesh::ServalRole; use utils::structs::Manifest; @@ -28,6 +29,7 @@ pub fn mount(router: ServalRouter) -> ServalRouter { .route("/v1/storage/data", post(store_by_content_address)) .route("/v1/storage/data/*address", get(get_by_content_address)) .route("/v1/storage/data/*address", head(has_content_address)) + .route("/v1/storage/data/*address", patch(patch_content_at_address)) } /// Mount a handler for all storage routes that relays requests to a node that can handle them. @@ -87,7 +89,7 @@ async fn get_by_content_address(Path(address): Path) -> impl IntoRespons return e.into_response() }; - match storage.data_by_integrity(integrity).await { + match storage.stream_by_integrity(integrity).await { Ok(stream) => { let headers = [( header::CONTENT_TYPE, @@ -136,6 +138,44 @@ async fn has_content_address(Path(address): Path) -> impl IntoResponse { } } +async fn patch_content_at_address(Path(address): Path, body: Bytes) -> impl IntoResponse { + metrics::increment_counter!("storage:cas:patch"); + let Some(storage) = STORAGE.get() else { + return (StatusCode::SERVICE_UNAVAILABLE, "storage uninitialized; programmer error".to_string()).into_response(); + }; + + let Ok(integrity) = address.parse::() else { + let e = ServalError::BlobAddressInvalid(format!("{} is not a valid sub-resource integrity string", address)); + return e.into_response() + }; + + match storage.data_by_integrity(integrity).await { + Ok(original) => { + log::info!("Patching CAS data; address={}", &address); + let Ok(updated)= apply_patch(&original, &body) else { + return ServalError::StorageError("patch could not be applied".to_string()).into_response(); + }; + match storage.store_by_integrity(&updated).await { + Ok(integrity) => { + log::info!( + "Stored updated patch in CAS storage; old_integrity={}; new_integrity={}; size={}", + address, + integrity, + updated.len() + ); + (StatusCode::CREATED, integrity.to_string()).into_response() + } + Err(e) => e.into_response(), + } + } + Err(ServalError::DataNotFound(s)) => (StatusCode::NOT_FOUND, s).into_response(), + Err(e) => { + log::info!("Error serving CAS data; address={}; error={}", &address, e); + e.into_response() + } + } +} + /// Fetch an executable by fully-qualified manifest name. async fn get_executable( Path((name, version)): Path<(String, String)>, diff --git a/agent/src/storage/blobs.rs b/agent/src/storage/blobs.rs index fdea465..3334949 100644 --- a/agent/src/storage/blobs.rs +++ b/agent/src/storage/blobs.rs @@ -51,7 +51,7 @@ impl BlobStore { /// Given a content address, return a read stream for the object stored there. /// Responds with an error if no object is found or if the address is invalid. - pub async fn data_by_integrity( + pub async fn stream_by_integrity( &self, integrity: &Integrity, ) -> ServalResult> { diff --git a/agent/src/storage/bucket.rs b/agent/src/storage/bucket.rs index 0113463..3814e06 100644 --- a/agent/src/storage/bucket.rs +++ b/agent/src/storage/bucket.rs @@ -22,7 +22,7 @@ impl S3Storage { } /// Check if the given data blob is present in our data store, by integrity hash. Returns a stream. - pub async fn data_by_integrity(&self, integrity: &Integrity) -> ServalResult { + pub async fn stream_by_integrity(&self, integrity: &Integrity) -> ServalResult { let object = self .client .get_object() diff --git a/agent/src/storage/mod.rs b/agent/src/storage/mod.rs index 3e6c3f0..0bb21f0 100644 --- a/agent/src/storage/mod.rs +++ b/agent/src/storage/mod.rs @@ -124,26 +124,26 @@ impl Storage { } } - pub async fn data_by_integrity( + pub async fn stream_by_integrity( &self, integrity: Integrity, ) -> ServalResult>> { if !self.has_storage() { let proxy = make_proxy_client().await?; - let bytes = proxy.data_by_integrity(&integrity.to_string()).await?; + let bytes = proxy.stream_by_integrity(&integrity.to_string()).await?; let reader = ReaderStream::new(vec_to_byte_stream(bytes)); return Ok(StreamBody::new(reader)); } if let Some(local) = &self.local { - if let Ok(v) = local.data_by_integrity(&integrity).await { + if let Ok(v) = local.stream_by_integrity(&integrity).await { log::info!("serving from local blobs; {integrity}"); return Ok(StreamBody::new(v)); } } if let Some(bucket) = &self.bucket { - if let Ok(bytestream) = bucket.data_by_integrity(&integrity).await { + if let Ok(bytestream) = bucket.stream_by_integrity(&integrity).await { log::info!("serving from s3 bucket; {integrity}"); let readable = bytestream.into_async_read(); let pinned: SendableStream = Box::pin(readable); @@ -155,6 +155,33 @@ impl Storage { Err(ServalError::DataNotFound(integrity.to_string())) } + /// Load data by its integrity into memory. + pub async fn data_by_integrity(&self, integrity: Integrity) -> ServalResult> { + let integrity_string = integrity.to_string(); + + if !self.has_storage() { + let proxy = make_proxy_client().await?; + let bytes = proxy.stream_by_integrity(&integrity_string).await?; + return Ok(bytes); + } + + if let Some(local) = &self.local { + if let Ok(bytes) = local.data_by_key(&integrity_string).await { + log::info!("serving from local blobs; {integrity}"); + return Ok(bytes); + } + } + + if let Some(bucket) = &self.bucket { + if let Ok(bytes) = bucket.data_by_key(&integrity_string).await { + log::info!("serving from s3 bucket; {integrity}"); + return Ok(bytes); + } + } + + Err(ServalError::DataNotFound(integrity.to_string())) + } + /// Check if the given manifest is present in our store, using the fully-qualified name. /// /// Never checks a proxy; this is intended to be a local check. diff --git a/api-client/src/lib.rs b/api-client/src/lib.rs index 7cc583b..2d20b48 100644 --- a/api-client/src/lib.rs +++ b/api-client/src/lib.rs @@ -182,7 +182,7 @@ impl ServalApiClient { } } - pub async fn data_by_integrity(&self, address: &str) -> ApiResult> { + pub async fn stream_by_integrity(&self, address: &str) -> ApiResult> { let url = self.build_url(&format!("storage/data/{address}")); let response = reqwest::get(&url).await?; if response.status().is_success() { diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 78dd675..7a3ba8d 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -20,6 +20,7 @@ log = { workspace = true } once_cell = "1.17.1" regex = "1.7.3" reqwest = { workspace = true } +qbsdiff = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sha2 = "0.10.6" @@ -29,4 +30,6 @@ tokio = { workspace = true } tokio-util = { workspace = true } toml = { workspace = true } uuid = { workspace = true } -wasi-common = { workspace = true } + +[dev-dependencies] +ssri = { workspace = true } diff --git a/utils/src/diffs.rs b/utils/src/diffs.rs new file mode 100644 index 0000000..a5c1ab3 --- /dev/null +++ b/utils/src/diffs.rs @@ -0,0 +1,53 @@ +use std::io::Cursor; + +use qbsdiff::{Bsdiff, Bspatch}; + +use crate::errors::ServalResult; + +pub fn make_patch(source: &[u8], target: &[u8]) -> ServalResult> { + let mut patch = Vec::new(); + Bsdiff::new(source, target).compare(Cursor::new(&mut patch))?; + Ok(patch) +} + +pub fn apply_patch(source: &[u8], patch: &[u8]) -> ServalResult> { + let patcher = Bspatch::new(patch)?; + let mut target = Vec::with_capacity(patcher.hint_target_size() as usize); + patcher.apply(source, Cursor::new(&mut target))?; + Ok(target) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::{BufReader, Read}; + + use ssri::Integrity; + + use super::{apply_patch, make_patch}; + + #[test] + fn patch_integrity() { + // verify that our patcher does what it promises. + let mut version1: Vec = Vec::new(); + let file = File::open("./tests/fixtures/serval-facts-1.wasm").expect("fixture 1 missing!"); + let mut reader = BufReader::new(file); + reader + .read_to_end(&mut version1) + .expect("fixture should be readable"); + + let mut version2: Vec = Vec::new(); + let file = File::open("./tests/fixtures/serval-facts-2.wasm").expect("fixture 2 missing!"); + let mut reader = BufReader::new(file); + reader + .read_to_end(&mut version2) + .expect("fixture should be readable"); + + let patch = make_patch(&version1, &version2).expect("creating the patch failed!"); + let patched = apply_patch(&version1, &patch).expect("applying the patch failed!"); + + let source_sri = Integrity::from(version2); + let patched_sri = Integrity::from(patched); + assert_eq!(source_sri, patched_sri); + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 01b677c..bb9cfdc 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,3 +1,4 @@ +pub mod diffs; pub mod errors; pub mod futures; pub mod mesh; diff --git a/utils/tests/fixtures/serval-facts-1.wasm b/utils/tests/fixtures/serval-facts-1.wasm new file mode 100755 index 0000000..af024dc Binary files /dev/null and b/utils/tests/fixtures/serval-facts-1.wasm differ diff --git a/utils/tests/fixtures/serval-facts-2.wasm b/utils/tests/fixtures/serval-facts-2.wasm new file mode 100755 index 0000000..2899b6f Binary files /dev/null and b/utils/tests/fixtures/serval-facts-2.wasm differ