Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the concept of a WASM manifest; use cacache for local storage #39

Merged
merged 6 commits into from Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
601 changes: 519 additions & 82 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -26,3 +26,4 @@ uuid = { version = "1.2.2", features = ["serde", "v4"] }
wasi-common = "5.0.0"
wasmtime = "5.0.0"
wasmtime-wasi = "5.0.0"

18 changes: 9 additions & 9 deletions agent/Cargo.toml
Expand Up @@ -5,19 +5,19 @@ edition = "2021"
license = "BSD-2-Clause-Patent"

[dependencies]
anyhow.workspace = true
anyhow = { workspace = true }
axum = { version = "0.6.1", features = ["json", "multipart"] }
dotenvy = "0.15.6"
engine = { path = "../engine" }
env_logger.workspace = true
env_logger = { workspace = true }
http = "0.2.8"
hyper = "0.14.23"
log.workspace = true
mdns-sd.workspace = true
reqwest.workspace = true
log = "0.4.17"
mdns-sd = { workspace = true }
reqwest = { workspace = true }
serde = { version = "1.0.149", features = ["serde_derive"] }
serde_json.workspace = true
tokio.workspace = true
tokio-util.workspace = true
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
utils = { path = "../utils" }
uuid.workspace = true
uuid = { workspace = true }
163 changes: 49 additions & 114 deletions agent/src/api/v1/jobs.rs
@@ -1,82 +1,55 @@
use std::{collections::HashMap, path::PathBuf};

use anyhow::Result;
use axum::{
extract::{Multipart, Path, State},
body::Bytes,
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
response::IntoResponse,
Json,
};
use engine::{errors::ServalEngineError, ServalEngine};
use utils::structs::WasmResult;
use utils::structs::Job;

use crate::structures::*;

/// This is the main worker endpoint. It accepts incoming jobs and runs them.
pub async fn incoming(state: State<AppState>, mut multipart: Multipart) -> Response {
let mut envelope: Option<Envelope> = None;
let mut binary: Option<Vec<u8>> = None;
let mut input: Option<Vec<u8>> = None;
/// Report on runtime history
pub async fn monitor_status(state: State<AppState>) -> Json<RunnerState> {
let state = state.lock().await;
Json(state.clone())
}

// chomp up form input here
while let Some(field) = multipart.next_field().await.unwrap() {
let name = field.name().unwrap().to_string();
let data = field.bytes().await.unwrap();
match name.as_str() {
"envelope" => {
let data = data.to_vec();
let Ok(parsed) = serde_json::from_slice(&data) else {
// this is not good enough
return (StatusCode::BAD_REQUEST, "job envelope is invalid".to_string()).into_response();
};
envelope = Some(parsed);
}
"input" => {
input = Some(data.to_vec());
}
"executable" => {
binary = Some(data.to_vec());
}
_ => {
log::info!("ignoring unknown field `{name}`");
}
}
}
/// Get running jobs
pub async fn running(_state: State<AppState>) -> impl IntoResponse {
StatusCode::NOT_IMPLEMENTED
}

/// This is the main worker endpoint. It accepts incoming jobs and runs them.
pub async fn run_job(
Path(name): Path<String>,
state: State<AppState>,
input: Bytes,
) -> impl IntoResponse {
let mut lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();
let Ok(manifest) = storage.manifest(&name).await else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that only nodes that have storage locally available can run jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, yes. I think the immediate next followup work is to make a node ask if if has the appropriate binary, and if not, find it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thoughts on swapping BlobStore out for an abstract trait that has CacacheBlobStore as one implementation and RemoteBlobStore (something that proxies to a serval_storage instance) as the other? If that sounds reasonable to me I am happy to start chipping away at it.

return (StatusCode::NOT_FOUND, "no manifest of that name found").into_response();
};

let Some(binary) = binary else {
return (
StatusCode::BAD_REQUEST,
"no wasm executable data provided!".to_string(),
)
.into_response();
let Ok(executable) = storage.executable_as_bytes(&name, manifest.version()).await else {
return (StatusCode::NOT_FOUND, "no executable found for manifest; key={key}").into_response();
};

let envelope = envelope.unwrap_or_default();
let metadata: JobMetadata = JobMetadata::from(envelope);
let job = Job::new(manifest, executable, input.to_vec());
log::info!(
"received WASM job; name={}; executable length={}; input length={}",
metadata.name(),
binary.len(),
input.as_ref().map(|input| input.len()).unwrap_or_else(|| 0),
"received WASM job; name={}; executable length={}; input length={}; id={}",
&job.manifest().fq_name(),
&job.executable().len(),
input.len(),
job.id()
);

run_job_inner(state, metadata, binary, input).await
}

async fn run_job_inner(
state: State<AppState>,
metadata: JobMetadata,
binary: Vec<u8>,
input: Option<Vec<u8>>,
) -> Response {
let mut state = state.lock().await;

// Poor human's history tracking here. We'll need to do better at some point.
// E.g., handle overflows. That would be some nice uptime.
state.total += 1;
state
.jobs
.insert(metadata.id().to_string(), metadata.clone());
lock.total += 1;
lock.jobs.insert(job.id().to_string(), job.clone());

let start = std::time::Instant::now();

Expand All @@ -85,17 +58,24 @@ async fn run_job_inner(
// The correct response by design is a 202 Accepted plus the metadata object.
// TODO: SER-38 - capture exit code for failed jobs
log::info!(
"about to run job name={}; id={}; executable size={}",
metadata.name(),
metadata.id(),
binary.len()
"about to run job name=TODO; id={}; executable size={}",
job.id(),
job.executable().len()
);
match execute_job(binary, input, &state.extensions).await {

let extensions = lock.extensions.clone();

let Ok(mut engine) = ServalEngine::new(extensions) else {
return (StatusCode::INTERNAL_SERVER_ERROR, "unable to create wasm engine").into_response();
};
let result = engine.execute(job.executable(), job.input());

match result {
Ok(result) => {
// We're not doing anything with stderr here.
log::info!(
"job completed; job={}; code={}; elapsed_ms={}",
metadata.id(),
job.id(),
result.code,
start.elapsed().as_millis()
);
Expand All @@ -117,53 +97,8 @@ async fn run_job_inner(
(StatusCode::OK, stderr).into_response()
}
Err(e) => {
state.errors += 1;
lock.errors += 1;
(StatusCode::BAD_REQUEST, e.to_string()).into_response()
}
}
}

/// Run a job in the wasm engine.
// Probably can vanish because there's only one caller.
async fn execute_job(
executable: Vec<u8>,
input: Option<Vec<u8>>,
extensions: &HashMap<String, PathBuf>,
) -> Result<WasmResult, ServalEngineError> {
let stdin = input.unwrap_or_default();

let mut engine = ServalEngine::new(extensions.clone())?;
let result = engine.execute(&executable, &stdin)?;

Ok(result)
}

/// Run a previously-stored job by address. Fast hack. Feel free to improve with an input feature.
pub async fn run_stored_job(
state: State<AppState>,
Path(blob_addr): Path<String>,
) -> impl IntoResponse {
let locked = state.lock().await;

let Some(storage) = locked.storage.as_ref() else {
// todo: in this case, we should proxy this request to another node that is advertising the serval_storage role
return (StatusCode::SERVICE_UNAVAILABLE, "Storage is not available").into_response();
};

let Ok(binary) = storage.get_bytes(&blob_addr).await else {
return (
StatusCode::NOT_FOUND,
format!("Blob {} not found", &blob_addr),
)
.into_response();
};

let metadata = JobMetadata::new(blob_addr.clone(), "stored binary".to_string());
drop(locked);
run_job_inner(state, metadata, binary, None).await
}

pub async fn monitor_history(state: State<AppState>) -> Json<RunnerState> {
let state = state.lock().await;
Json(state.clone())
}
112 changes: 92 additions & 20 deletions agent/src/api/v1/storage.rs
Expand Up @@ -3,64 +3,103 @@ use axum::{
extract::{Path, State},
http::{header, StatusCode},
response::IntoResponse,
Json,
};

use utils::errors::ServalError;
use utils::structs::Manifest;

use crate::structures::AppState;

pub async fn get_blob(
Path(blob_addr): Path<String>,
/// Fetch an executable by fully-qualified manifest name.
pub async fn get_executable(
Path((name, version)): Path<(String, String)>,
State(state): State<AppState>,
) -> impl IntoResponse {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();

match storage.get_stream(&blob_addr).await {
match storage.executable_as_stream(&name, &version).await {
Ok(stream) => {
let body = StreamBody::new(stream);
let headers = [(
header::CONTENT_TYPE,
String::from("application/octet-stream"),
)];

log::info!("Serving blob; addr={}", &blob_addr);
log::info!("Serving job binary; name={}", &name);
(headers, body).into_response()
}
Err(e) => {
log::warn!("error reading blob; addr={}; error={}", blob_addr, e);
log::warn!("error reading job binary; name={}; error={}", name, e);
e.into_response()
}
}
}

pub async fn store_blob(State(state): State<AppState>, body: Bytes) -> impl IntoResponse {
/// Fetch task manifest by name. The manifest is returned as json.
pub async fn get_manifest(
Path(name): Path<String>,
State(state): State<AppState>,
) -> impl IntoResponse {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match storage.store(&body).await {
Ok((new, address)) => {
log::info!("Stored blob; addr={} size={}", &address, body.len());
if new {
(StatusCode::CREATED, address).into_response()
} else {
(StatusCode::OK, address).into_response()
}
match storage.manifest(&name).await {
Ok(v) => {
log::info!("Serving job manifest; name={}", &name);
let stringified = v.to_string();
let headers = [(header::CONTENT_TYPE, String::from("application/toml"))];
(headers, stringified).into_response()
}
Err(e) => {
log::warn!("error reading job metadata; name={}; error={}", &name, e);
e.into_response()
}
}
}

/// Store a job with its metadata.
pub async fn store_executable(
State(state): State<AppState>,
Path((name, version)): Path<(String, String)>,
body: Bytes,
) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

let Ok(manifest) = storage.manifest(&name).await else {
return (StatusCode::NOT_FOUND, format!("no manifest of that name found; name={name}")).into_response();
};

let bytes = body.to_vec();

match storage.store_executable(&name, &version, &bytes).await {
Ok(integrity) => {
log::info!(
"Stored new executable; name={}@{}; executable_hash={}; size={}",
manifest.fq_name(),
version,
integrity,
bytes.len()
);
(StatusCode::CREATED, integrity).into_response()
}
Err(_e) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
Err(e) => e.into_response(),
}
}

pub async fn has_blob(Path(blob_addr): Path<String>, State(state): State<AppState>) -> StatusCode {
/// Returns true if this node has access to the given task type, specified by fully-qualified name.
pub async fn has_manifest(Path(name): Path<String>, State(state): State<AppState>) -> StatusCode {
// Yeah, I don't like this.
let state = state.lock().await;
let storage = state.storage.as_ref().unwrap();

match storage.has_blob(&blob_addr).await {
match storage.data_exists_by_key(&name).await {
Ok(exists) => {
log::info!("Has blob?; exists={exists} addr={blob_addr}");
log::info!("Has manifest?; exists={exists} addr={name}");
if exists {
StatusCode::OK
} else {
Expand All @@ -71,3 +110,36 @@ pub async fn has_blob(Path(blob_addr): Path<String>, State(state): State<AppStat
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

pub async fn list_manifests(State(state): State<AppState>) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match storage.manifest_names() {
Ok(list) => (StatusCode::OK, Json(list)).into_response(),
Err(e) => e.into_response(),
}
}

pub async fn store_manifest(State(state): State<AppState>, body: String) -> impl IntoResponse {
let lock = state.lock().await;
let storage = lock.storage.as_ref().unwrap();

match Manifest::from_string(&body) {
Ok(manifest) => {
log::info!("storing manifest for job={}", manifest.fq_name());
match storage.store_manifest(&manifest).await {
Ok(integrity) => {
log::info!(
"Stored new manifest; name={}; manifest_hash={}",
manifest.fq_name(),
integrity.to_string(),
);
(StatusCode::CREATED, integrity.to_string()).into_response()
}
Err(e) => e.into_response(),
}
}
Err(e) => (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
}
}