Skip to content

Commit

Permalink
Introduce the concept of a WASM manifest; use cacache for local stora…
Browse files Browse the repository at this point in the history
…ge (#39)

This PR reworks the agent API and the cli to support a new concept of a "manifest". This is a toml specification of a WASM job type. It includes a name, a namespace, a path to a binary, and some human-readable descriptions.

Here's an example manifest:

```toml
name = "loudify"
namespace = "sh.serval"
binary = "../../wasm-samples/build/loudify.wasm"
version = "1"
description = "SHOUT SHOUT LET IT ALL OUT"
required_extensions = ["birdfeeder"]
```

Once a WASM manifest + executable has been stored in the system, you can execute that wasm using the
name in the manifest: `pounce run sh.serval.loudify inputfile.txt`

The local blob storage has been reimplemented to use the [cacache](https://lib.rs/crates/cacache)
crate. This is a content-adddressable store that *also* allows lookup of items by key. In our case
we store manifests with keys like `name.manifest.toml` and wasm executables with names like
`name.wasm`. Execution of jobs is now more indirect than it was, in that we look up the manifest by
name, and from that look up where the binary is by version. This indirection allows us to support
versioning more fully in the long term.

This PR also significantly reworks the agent API and changes the CLI to support this new API. The agent endpoints now look like this:

- `GET /monitor/ping`: cheap health check
- `POST /v1/jobs/:name/run`: body is interpreted as job input
- `GET /v1/storage/manifests`: fetch a list of manifests
- `POST /v1/storage/manifests`: store a new manifest
- `GET /v1/storage/manifests/:name`: get a manifest by name
- `HEAD /v1/storage/manifests/:name`: 404s if manifest not found
- `PUT /v1/storage/manifests/:name/executable/:version`: store a new wasm executable associated with the named task manifest
- `GET /v1/storage/manifests/:name/executable/:version`: get a wasm executable by manifest name & version
- `GET /v1/monitor/history`: very cheap run history; to be deleted
- `GET /v1/jobs`: list all running jobs; not implemented

Also, take a few minor updates to crates; revert change to the workspace syntax because it is making VSCode angry.

* Clean up error -> status mapping.

* WASM paths are now relative to the manifest path.

As is right and proper.

* better cargo run hint!
  • Loading branch information
ceejbot committed Feb 3, 2023
1 parent d3fd131 commit 8e4624d
Show file tree
Hide file tree
Showing 15 changed files with 1,083 additions and 453 deletions.
601 changes: 519 additions & 82 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -26,3 +26,4 @@ uuid = { version = "1.2.2", features = ["serde", "v4"] }
wasi-common = "5.0.0"
wasmtime = "5.0.0"
wasmtime-wasi = "5.0.0"

18 changes: 9 additions & 9 deletions agent/Cargo.toml
Expand Up @@ -5,19 +5,19 @@ edition = "2021"
license = "BSD-2-Clause-Patent"

[dependencies]
anyhow.workspace = true
anyhow = { workspace = true }
axum = { version = "0.6.1", features = ["json", "multipart"] }
dotenvy = "0.15.6"
engine = { path = "../engine" }
env_logger.workspace = true
env_logger = { workspace = true }
http = "0.2.8"
hyper = "0.14.23"
log.workspace = true
mdns-sd.workspace = true
reqwest.workspace = true
log = "0.4.17"
mdns-sd = { workspace = true }
reqwest = { workspace = true }
serde = { version = "1.0.149", features = ["serde_derive"] }
serde_json.workspace = true
tokio.workspace = true
tokio-util.workspace = true
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
utils = { path = "../utils" }
uuid.workspace = true
uuid = { workspace = true }
163 changes: 49 additions & 114 deletions agent/src/api/v1/jobs.rs
@@ -1,82 +1,55 @@
use std::{collections::HashMap, path::PathBuf};

use anyhow::Result;
use axum::{
extract::{Multipart, Path, State},
body::Bytes,
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
response::IntoResponse,
Json,
};
use engine::{errors::ServalEngineError, ServalEngine};
use utils::structs::WasmResult;
use utils::structs::Job;

use crate::structures::*;

/// This is the main worker endpoint. It accepts incoming jobs and runs them.
pub async fn incoming(state: State<AppState>, mut multipart: Multipart) -> Response {
let mut envelope: Option<Envelope> = None;
let mut binary: Option<Vec<u8>> = None;
let mut input: Option<Vec<u8>> = None;
/// Report on runtime history
pub async fn monitor_status(state: State<AppState>) -> Json<RunnerState> {
let state = state.lock().await;
Json(state.clone())
}

// chomp up form input here
while let Some(field) = multipart.next_field().await.unwrap() {
let name = field.name().unwrap().to_string();
let data = field.bytes().await.unwrap();
match name.as_str() {
"envelope" => {
let data = data.to_vec();
let Ok(parsed) = serde_json::from_slice(&data) else {
// this is not good enough
return (StatusCode::BAD_REQUEST, "job envelope is invalid".to_string()).into_response();
};
envelope = Some(parsed);
}
"input" => {
input = Some(data.to_vec());
}
"executable" => {
binary = Some(data.to_vec());
}
_ => {
log::info!("ignoring unknown field `{name}`");
}
}
}
/// Get running jobs
pub async fn running(_state: State<AppState>) -> impl IntoResponse {
StatusCode::NOT_IMPLEMENTED
}

/// This is the main worker endpoint. It accepts incoming jobs and runs them.
pub async fn run_job(
Path(name): Path<String>,
state: State<AppState>,
input: Bytes,
) -> impl IntoResponse {
let mut lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();
let Ok(manifest) = storage.manifest(&name).await else {
return (StatusCode::NOT_FOUND, "no manifest of that name found").into_response();
};

let Some(binary) = binary else {
return (
StatusCode::BAD_REQUEST,
"no wasm executable data provided!".to_string(),
)
.into_response();
let Ok(executable) = storage.executable_as_bytes(&name, manifest.version()).await else {
return (StatusCode::NOT_FOUND, "no executable found for manifest; key={key}").into_response();
};

let envelope = envelope.unwrap_or_default();
let metadata: JobMetadata = JobMetadata::from(envelope);
let job = Job::new(manifest, executable, input.to_vec());
log::info!(
"received WASM job; name={}; executable length={}; input length={}",
metadata.name(),
binary.len(),
input.as_ref().map(|input| input.len()).unwrap_or_else(|| 0),
"received WASM job; name={}; executable length={}; input length={}; id={}",
&job.manifest().fq_name(),
&job.executable().len(),
input.len(),
job.id()
);

run_job_inner(state, metadata, binary, input).await
}

async fn run_job_inner(
state: State<AppState>,
metadata: JobMetadata,
binary: Vec<u8>,
input: Option<Vec<u8>>,
) -> Response {
let mut state = state.lock().await;

// Poor human's history tracking here. We'll need to do better at some point.
// E.g., handle overflows. That would be some nice uptime.
state.total += 1;
state
.jobs
.insert(metadata.id().to_string(), metadata.clone());
lock.total += 1;
lock.jobs.insert(job.id().to_string(), job.clone());

let start = std::time::Instant::now();

Expand All @@ -85,17 +58,24 @@ async fn run_job_inner(
// The correct response by design is a 202 Accepted plus the metadata object.
// TODO: SER-38 - capture exit code for failed jobs
log::info!(
"about to run job name={}; id={}; executable size={}",
metadata.name(),
metadata.id(),
binary.len()
"about to run job name=TODO; id={}; executable size={}",
job.id(),
job.executable().len()
);
match execute_job(binary, input, &state.extensions).await {

let extensions = lock.extensions.clone();

let Ok(mut engine) = ServalEngine::new(extensions) else {
return (StatusCode::INTERNAL_SERVER_ERROR, "unable to create wasm engine").into_response();
};
let result = engine.execute(job.executable(), job.input());

match result {
Ok(result) => {
// We're not doing anything with stderr here.
log::info!(
"job completed; job={}; code={}; elapsed_ms={}",
metadata.id(),
job.id(),
result.code,
start.elapsed().as_millis()
);
Expand All @@ -117,53 +97,8 @@ async fn run_job_inner(
(StatusCode::OK, stderr).into_response()
}
Err(e) => {
state.errors += 1;
lock.errors += 1;
(StatusCode::BAD_REQUEST, e.to_string()).into_response()
}
}
}

/// Run a job in the wasm engine.
// Probably can vanish because there's only one caller.
async fn execute_job(
executable: Vec<u8>,
input: Option<Vec<u8>>,
extensions: &HashMap<String, PathBuf>,
) -> Result<WasmResult, ServalEngineError> {
let stdin = input.unwrap_or_default();

let mut engine = ServalEngine::new(extensions.clone())?;
let result = engine.execute(&executable, &stdin)?;

Ok(result)
}

/// Run a previously-stored job by address. Fast hack. Feel free to improve with an input feature.
pub async fn run_stored_job(
state: State<AppState>,
Path(blob_addr): Path<String>,
) -> impl IntoResponse {
let locked = state.lock().await;

let Some(storage) = locked.storage.as_ref() else {
// todo: in this case, we should proxy this request to another node that is advertising the serval_storage role
return (StatusCode::SERVICE_UNAVAILABLE, "Storage is not available").into_response();
};

let Ok(binary) = storage.get_bytes(&blob_addr).await else {
return (
StatusCode::NOT_FOUND,
format!("Blob {} not found", &blob_addr),
)
.into_response();
};

let metadata = JobMetadata::new(blob_addr.clone(), "stored binary".to_string());
drop(locked);
run_job_inner(state, metadata, binary, None).await
}

pub async fn monitor_history(state: State<AppState>) -> Json<RunnerState> {
let state = state.lock().await;
Json(state.clone())
}
112 changes: 92 additions & 20 deletions agent/src/api/v1/storage.rs
Expand Up @@ -3,64 +3,103 @@ use axum::{
extract::{Path, State},
http::{header, StatusCode},
response::IntoResponse,
Json,
};

use utils::errors::ServalError;
use utils::structs::Manifest;

use crate::structures::AppState;

pub async fn get_blob(
Path(blob_addr): Path<String>,
/// Fetch an executable by fully-qualified manifest name.
pub async fn get_executable(
Path((name, version)): Path<(String, String)>,
State(state): State<AppState>,
) -> impl IntoResponse {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();

match storage.get_stream(&blob_addr).await {
match storage.executable_as_stream(&name, &version).await {
Ok(stream) => {
let body = StreamBody::new(stream);
let headers = [(
header::CONTENT_TYPE,
String::from("application/octet-stream"),
)];

log::info!("Serving blob; addr={}", &blob_addr);
log::info!("Serving job binary; name={}", &name);
(headers, body).into_response()
}
Err(e) => {
log::warn!("error reading blob; addr={}; error={}", blob_addr, e);
log::warn!("error reading job binary; name={}; error={}", name, e);
e.into_response()
}
}
}

pub async fn store_blob(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
/// Fetch task manifest by name. The manifest is returned as json.
pub async fn get_manifest(
Path(name): Path<String>,
State(state): State<AppState>,
) -> impl IntoResponse {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match storage.store(&body).await {
Ok((new, address)) => {
log::info!("Stored blob; addr={} size={}", &address, body.len());
if new {
(StatusCode::CREATED, address).into_response()
} else {
(StatusCode::OK, address).into_response()
}
match storage.manifest(&name).await {
Ok(v) => {
log::info!("Serving job manifest; name={}", &name);
let stringified = v.to_string();
let headers = [(header::CONTENT_TYPE, String::from("application/toml"))];
(headers, stringified).into_response()
}
Err(e) => {
log::warn!("error reading job metadata; name={}; error={}", &name, e);
e.into_response()
}
}
}

/// Store a job with its metadata.
pub async fn store_executable(
State(state): State<AppState>,
Path((name, version)): Path<(String, String)>,
body: Bytes,
) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

let Ok(manifest) = storage.manifest(&name).await else {
return (StatusCode::NOT_FOUND, format!("no manifest of that name found; name={name}")).into_response();
};

let bytes = body.to_vec();

match storage.store_executable(&name, &version, &bytes).await {
Ok(integrity) => {
log::info!(
"Stored new executable; name={}@{}; executable_hash={}; size={}",
manifest.fq_name(),
version,
integrity,
bytes.len()
);
(StatusCode::CREATED, integrity).into_response()
}
Err(_e) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
Err(e) => e.into_response(),
}
}

pub async fn has_blob(Path(blob_addr): Path<String>, State(state): State<AppState>) -> StatusCode {
/// Returns true if this node has access to the given task type, specified by fully-qualified name.
pub async fn has_manifest(Path(name): Path<String>, State(state): State<AppState>) -> StatusCode {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();

match storage.has_blob(&blob_addr).await {
match storage.data_exists_by_key(&name).await {
Ok(exists) => {
log::info!("Has blob?; exists={exists} addr={blob_addr}");
log::info!("Has manifest?; exists={exists} addr={name}");
if exists {
StatusCode::OK
} else {
Expand All @@ -71,3 +110,36 @@ pub async fn has_blob(Path(blob_addr): Path<String>, State(state): State<AppStat
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

pub async fn list_manifests(State(state): State<AppState>) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match storage.manifest_names() {
Ok(list) => (StatusCode::OK, Json(list)).into_response(),
Err(e) => e.into_response(),
}
}

pub async fn store_manifest(State(state): State<AppState>, body: String) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match Manifest::from_string(&body) {
Ok(manifest) => {
log::info!("storing manifest for job={}", manifest.fq_name());
match storage.store_manifest(&manifest).await {
Ok(integrity) => {
log::info!(
"Stored new manifest; name={}; manifest_hash={}",
manifest.fq_name(),
integrity.to_string(),
);
(StatusCode::CREATED, integrity.to_string()).into_response()
}
Err(e) => e.into_response(),
}
}
Err(e) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
}
}

0 comments on commit 8e4624d

Please sign in to comment.