Skip to content

Commit

Permalink
A PATCH endpoint for the content-addressable store (#67)
Browse files Browse the repository at this point in the history
`utils::diffs::make_patch()` and `utils::diffs::make_patch()` now exist to make compressed binary patches. The intended use case is to send executable deltas around instead of full payloads. These use the new dependency [qbsdiff](https://lib.rs/crates/qbsdiff) to generate and apply compressed binary patches.

Implemented `PATCH /v1/storage/data/*address` using the `apply_patch()` function. It's up to you
to generate the patch that gets applied somehow. (qbsdiff has command-line tools for this.)

Renamed some storage functions to make it clear that they return streams, to distinguish them from ones that return vecs of u8.
  • Loading branch information
ceejbot committed May 24, 2023
1 parent 53904b8 commit 9e3c487
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 11 deletions.
63 changes: 62 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dotenvy = "0.15.6"
env_logger = "0.10.0"
log = "0.4.17"
reqwest = { version = "0.11.13", default-features = false, features = ["brotli", "json", "rustls-tls", "stream", "native-tls-vendored"] }
qbsdiff = "1.4.0"
serde = { version = "1.0.149", features = ["derive"] }
serde_json = "1.0.89"
ssri = "8.0.0"
Expand Down
44 changes: 42 additions & 2 deletions agent/src/api/v1/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use axum::body::{Body, Bytes};
use axum::extract::{Path, State};
use axum::http::{header, Request, StatusCode};
use axum::response::IntoResponse;
use axum::routing::{any, get, head, post, put};
use axum::routing::{any, get, head, patch, post, put};
use ssri::Integrity;
use utils::diffs::apply_patch;
use utils::errors::ServalError;
use utils::mesh::ServalRole;
use utils::structs::Manifest;
Expand All @@ -28,6 +29,7 @@ pub fn mount(router: ServalRouter) -> ServalRouter {
.route("/v1/storage/data", post(store_by_content_address))
.route("/v1/storage/data/*address", get(get_by_content_address))
.route("/v1/storage/data/*address", head(has_content_address))
.route("/v1/storage/data/*address", patch(patch_content_at_address))
}

/// Mount a handler for all storage routes that relays requests to a node that can handle them.
Expand Down Expand Up @@ -87,7 +89,7 @@ async fn get_by_content_address(Path(address): Path<String>) -> impl IntoRespons
return e.into_response()
};

match storage.data_by_integrity(integrity).await {
match storage.stream_by_integrity(integrity).await {
Ok(stream) => {
let headers = [(
header::CONTENT_TYPE,
Expand Down Expand Up @@ -136,6 +138,44 @@ async fn has_content_address(Path(address): Path<String>) -> impl IntoResponse {
}
}

async fn patch_content_at_address(Path(address): Path<String>, body: Bytes) -> impl IntoResponse {
metrics::increment_counter!("storage:cas:patch");
let Some(storage) = STORAGE.get() else {
return (StatusCode::SERVICE_UNAVAILABLE, "storage uninitialized; programmer error".to_string()).into_response();
};

let Ok(integrity) = address.parse::<Integrity>() else {
let e = ServalError::BlobAddressInvalid(format!("{} is not a valid sub-resource integrity string", address));
return e.into_response()
};

match storage.data_by_integrity(integrity).await {
Ok(original) => {
log::info!("Patching CAS data; address={}", &address);
let Ok(updated)= apply_patch(&original, &body) else {
return ServalError::StorageError("patch could not be applied".to_string()).into_response();
};
match storage.store_by_integrity(&updated).await {
Ok(integrity) => {
log::info!(
"Stored updated patch in CAS storage; old_integrity={}; new_integrity={}; size={}",
address,
integrity,
updated.len()
);
(StatusCode::CREATED, integrity.to_string()).into_response()
}
Err(e) => e.into_response(),
}
}
Err(ServalError::DataNotFound(s)) => (StatusCode::NOT_FOUND, s).into_response(),
Err(e) => {
log::info!("Error serving CAS data; address={}; error={}", &address, e);
e.into_response()
}
}
}

/// Fetch an executable by fully-qualified manifest name.
async fn get_executable(
Path((name, version)): Path<(String, String)>,
Expand Down
2 changes: 1 addition & 1 deletion agent/src/storage/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl BlobStore {

/// Given a content address, return a read stream for the object stored there.
/// Responds with an error if no object is found or if the address is invalid.
pub async fn data_by_integrity(
pub async fn stream_by_integrity(
&self,
integrity: &Integrity,
) -> ServalResult<ReaderStream<SendableStream>> {
Expand Down
2 changes: 1 addition & 1 deletion agent/src/storage/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl S3Storage {
}

/// Check if the given data blob is present in our data store, by integrity hash. Returns a stream.
pub async fn data_by_integrity(&self, integrity: &Integrity) -> ServalResult<ByteStream> {
pub async fn stream_by_integrity(&self, integrity: &Integrity) -> ServalResult<ByteStream> {
let object = self
.client
.get_object()
Expand Down
35 changes: 31 additions & 4 deletions agent/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,26 @@ impl Storage {
}
}

pub async fn data_by_integrity(
pub async fn stream_by_integrity(
&self,
integrity: Integrity,
) -> ServalResult<StreamBody<ReaderStream<SendableStream>>> {
if !self.has_storage() {
let proxy = make_proxy_client().await?;
let bytes = proxy.data_by_integrity(&integrity.to_string()).await?;
let bytes = proxy.stream_by_integrity(&integrity.to_string()).await?;
let reader = ReaderStream::new(vec_to_byte_stream(bytes));
return Ok(StreamBody::new(reader));
}

if let Some(local) = &self.local {
if let Ok(v) = local.data_by_integrity(&integrity).await {
if let Ok(v) = local.stream_by_integrity(&integrity).await {
log::info!("serving from local blobs; {integrity}");
return Ok(StreamBody::new(v));
}
}

if let Some(bucket) = &self.bucket {
if let Ok(bytestream) = bucket.data_by_integrity(&integrity).await {
if let Ok(bytestream) = bucket.stream_by_integrity(&integrity).await {
log::info!("serving from s3 bucket; {integrity}");
let readable = bytestream.into_async_read();
let pinned: SendableStream = Box::pin(readable);
Expand All @@ -155,6 +155,33 @@ impl Storage {
Err(ServalError::DataNotFound(integrity.to_string()))
}

/// Load data by its integrity into memory.
pub async fn data_by_integrity(&self, integrity: Integrity) -> ServalResult<Vec<u8>> {
let integrity_string = integrity.to_string();

if !self.has_storage() {
let proxy = make_proxy_client().await?;
let bytes = proxy.stream_by_integrity(&integrity_string).await?;
return Ok(bytes);
}

if let Some(local) = &self.local {
if let Ok(bytes) = local.data_by_key(&integrity_string).await {
log::info!("serving from local blobs; {integrity}");
return Ok(bytes);
}
}

if let Some(bucket) = &self.bucket {
if let Ok(bytes) = bucket.data_by_key(&integrity_string).await {
log::info!("serving from s3 bucket; {integrity}");
return Ok(bytes);
}
}

Err(ServalError::DataNotFound(integrity.to_string()))
}

/// Check if the given manifest is present in our store, using the fully-qualified name.
///
/// Never checks a proxy; this is intended to be a local check.
Expand Down
2 changes: 1 addition & 1 deletion api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl ServalApiClient {
}
}

pub async fn data_by_integrity(&self, address: &str) -> ApiResult<Vec<u8>> {
pub async fn stream_by_integrity(&self, address: &str) -> ApiResult<Vec<u8>> {
let url = self.build_url(&format!("storage/data/{address}"));
let response = reqwest::get(&url).await?;
if response.status().is_success() {
Expand Down
5 changes: 4 additions & 1 deletion utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log = { workspace = true }
once_cell = "1.17.1"
regex = "1.7.3"
reqwest = { workspace = true }
qbsdiff = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = "0.10.6"
Expand All @@ -29,4 +30,6 @@ tokio = { workspace = true }
tokio-util = { workspace = true }
toml = { workspace = true }
uuid = { workspace = true }
wasi-common = { workspace = true }

[dev-dependencies]
ssri = { workspace = true }
53 changes: 53 additions & 0 deletions utils/src/diffs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::io::Cursor;

use qbsdiff::{Bsdiff, Bspatch};

use crate::errors::ServalResult;

pub fn make_patch(source: &[u8], target: &[u8]) -> ServalResult<Vec<u8>> {
let mut patch = Vec::new();
Bsdiff::new(source, target).compare(Cursor::new(&mut patch))?;
Ok(patch)
}

pub fn apply_patch(source: &[u8], patch: &[u8]) -> ServalResult<Vec<u8>> {
let patcher = Bspatch::new(patch)?;
let mut target = Vec::with_capacity(patcher.hint_target_size() as usize);
patcher.apply(source, Cursor::new(&mut target))?;
Ok(target)
}

#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::{BufReader, Read};

use ssri::Integrity;

use super::{apply_patch, make_patch};

#[test]
fn patch_integrity() {
// verify that our patcher does what it promises.
let mut version1: Vec<u8> = Vec::new();
let file = File::open("./tests/fixtures/serval-facts-1.wasm").expect("fixture 1 missing!");
let mut reader = BufReader::new(file);
reader
.read_to_end(&mut version1)
.expect("fixture should be readable");

let mut version2: Vec<u8> = Vec::new();
let file = File::open("./tests/fixtures/serval-facts-2.wasm").expect("fixture 2 missing!");
let mut reader = BufReader::new(file);
reader
.read_to_end(&mut version2)
.expect("fixture should be readable");

let patch = make_patch(&version1, &version2).expect("creating the patch failed!");
let patched = apply_patch(&version1, &patch).expect("applying the patch failed!");

let source_sri = Integrity::from(version2);
let patched_sri = Integrity::from(patched);
assert_eq!(source_sri, patched_sri);
}
}
1 change: 1 addition & 0 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod diffs;
pub mod errors;
pub mod futures;
pub mod mesh;
Expand Down
Binary file added utils/tests/fixtures/serval-facts-1.wasm
Binary file not shown.
Binary file added utils/tests/fixtures/serval-facts-2.wasm
Binary file not shown.

0 comments on commit 9e3c487

Please sign in to comment.