From 45962a9255c42dd61cfe60e84df30048c073bbf1 Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Thu, 6 Apr 2023 11:37:41 +1000 Subject: [PATCH] fix: return an error when service path does not exist --- Cargo.lock | 1 + base/Cargo.toml | 1 + base/src/js_worker/user_workers.rs | 16 +++++++++-- base/src/worker_ctx.rs | 44 ++++++++++++++++++++---------- examples/main/index.ts | 26 ++++++++++++------ 5 files changed, 61 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c698a2c89..1d5b2ec46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,6 +186,7 @@ dependencies = [ "serde", "tokio", "url", + "uuid", "v8", ] diff --git a/base/Cargo.toml b/base/Cargo.toml index de741f955..78300c4cd 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -31,4 +31,5 @@ reqwest = { version = "0.11.13" } serde = { version = "1.0.149", features = ["derive"] } tokio = { version = "1.24", features = ["full"] } url = { version = "2.3.1" } +uuid = { version = "1.1.2", features = ["v4"] } v8 = { version = "0.60.1", default-features = false } diff --git a/base/src/js_worker/user_workers.rs b/base/src/js_worker/user_workers.rs index 2272be5ee..b2da711e6 100644 --- a/base/src/js_worker/user_workers.rs +++ b/base/src/js_worker/user_workers.rs @@ -1,5 +1,6 @@ use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions}; +use anyhow::Error; use deno_core::error::{custom_error, type_error, AnyError}; use deno_core::futures::stream::Peekable; use deno_core::futures::Stream; @@ -27,6 +28,7 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; pub fn init() -> Extension { Extension::builder("custom:user_workers") @@ -54,7 +56,7 @@ pub async fn op_user_worker_create( ) -> Result { let op_state = state.borrow(); let tx = op_state.borrow::>(); - let (result_tx, result_rx) = oneshot::channel::(); + let (result_tx, result_rx) = oneshot::channel::>(); let mut env_vars = HashMap::new(); for (key, value) in env_vars_vec { @@ -79,8 +81,15 @@ pub async fn op_user_worker_create( )); } + // channel returns a Result, we need to unwrap it first; let result = result.unwrap(); - Ok(result.key) + if result.is_err() { + return Err(custom_error( + "create_user_worker_error", + result.unwrap_err().to_string(), + )); + } + Ok(result.unwrap().key.to_string()) } #[derive(Deserialize, Debug)] @@ -285,7 +294,8 @@ pub async fn op_user_worker_fetch_send( .ok() .expect("multiple op_user_worker_fetch_send ongoing"); let (result_tx, result_rx) = oneshot::channel::>(); - tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx)); + let uuid = Uuid::parse_str(key.as_str())?; + tx.send(UserWorkerMsgs::SendRequest(uuid, request.0, result_tx)); let result = result_rx.await; if result.is_err() { diff --git a/base/src/worker_ctx.rs b/base/src/worker_ctx.rs index 119b06e12..40eedf578 100644 --- a/base/src/worker_ctx.rs +++ b/base/src/worker_ctx.rs @@ -1,6 +1,6 @@ use crate::js_worker::{MainWorker, UserWorker}; -use anyhow::Error; +use anyhow::{bail, Error}; use hyper::{Body, Request, Response}; use log::{debug, error}; use std::collections::HashMap; @@ -11,7 +11,9 @@ use std::thread; use tokio::net::UnixStream; use tokio::sync::RwLock; use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; +#[derive(Debug)] pub struct WorkerContext { handle: thread::JoinHandle>, request_sender: hyper::client::conn::SendRequest, @@ -41,6 +43,10 @@ impl WorkerContext { let import_map_path = options.import_map_path; let user_worker_msgs_tx = options.user_worker_msgs_tx; + if (!service_path.exists()) { + return bail!("main function does not exist {:?}", &service_path); + } + // create a unix socket pair let (sender_stream, recv_stream) = UnixStream::pair()?; @@ -90,6 +96,10 @@ impl WorkerContext { let import_map_path = options.import_map_path; let env_vars = options.env_vars; + if (!service_path.exists()) { + return bail!("user function does not exist {:?}", &service_path); + } + // create a unix socket pair let (sender_stream, recv_stream) = UnixStream::pair()?; @@ -141,13 +151,16 @@ impl WorkerContext { #[derive(Debug)] pub struct CreateUserWorkerResult { - pub key: String, + pub key: Uuid, } #[derive(Debug)] pub enum UserWorkerMsgs { - Create(UserWorkerOptions, oneshot::Sender), - SendRequest(String, Request, oneshot::Sender>), + Create( + UserWorkerOptions, + oneshot::Sender>, + ), + SendRequest(Uuid, Request, oneshot::Sender>), } pub struct WorkerPool { @@ -174,23 +187,24 @@ impl WorkerPool { let main_worker = Arc::new(RwLock::new(main_worker_ctx)); tokio::spawn(async move { - let mut user_workers: HashMap>> = HashMap::new(); + let mut user_workers: HashMap>> = HashMap::new(); loop { match user_worker_msgs_rx.recv().await { None => break, Some(UserWorkerMsgs::Create(worker_options, tx)) => { - let key = worker_options.service_path.display().to_string(); - if !user_workers.contains_key(&key) { - // TODO: handle errors - let user_worker_ctx = WorkerContext::new_user_worker(worker_options) - .await - .unwrap(); - user_workers - .insert(key.clone(), Arc::new(RwLock::new(user_worker_ctx))); + let key = Uuid::new_v4(); + let user_worker_ctx = WorkerContext::new_user_worker(worker_options).await; + if !user_worker_ctx.is_err() { + user_workers.insert( + key.clone(), + Arc::new(RwLock::new(user_worker_ctx.unwrap())), + ); + + tx.send(Ok(CreateUserWorkerResult { key })); + } else { + tx.send(Err(user_worker_ctx.unwrap_err())); } - - tx.send(CreateUserWorkerResult { key }); } Some(UserWorkerMsgs::SendRequest(key, req, tx)) => { // TODO: handle errors diff --git a/examples/main/index.ts b/examples/main/index.ts index 54bee5f84..52b990b43 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -24,13 +24,21 @@ serve(async (req: Request) => { const no_module_cache = false; const import_map_path = null; const env_vars = []; - const worker = await EdgeRuntime.userWorkers.create({ - service_path, - memory_limit_mb, - worker_timeout_ms, - no_module_cache, - import_map_path, - env_vars - }); - return worker.fetch(req); + try { + const worker = await EdgeRuntime.userWorkers.create({ + service_path, + memory_limit_mb, + worker_timeout_ms, + no_module_cache, + import_map_path, + env_vars + }); + return worker.fetch(req); + } catch (e) { + const error = { msg: e.toString() } + return new Response( + JSON.stringify(error), + { status: 500, headers: { "Content-Type": "application/json" } }, + ) + } })