Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ reqwest = { version = "0.11.13" }
serde = { version = "1.0.149", features = ["derive"] }
tokio = { version = "1.24", features = ["full"] }
url = { version = "2.3.1" }
uuid = { version = "1.1.2", features = ["v4"] }
v8 = { version = "0.60.1", default-features = false }
16 changes: 13 additions & 3 deletions base/src/js_worker/user_workers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions};

use anyhow::Error;
use deno_core::error::{custom_error, type_error, AnyError};
use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream;
Expand Down Expand Up @@ -27,6 +28,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

pub fn init() -> Extension {
Extension::builder("custom:user_workers")
Expand Down Expand Up @@ -54,7 +56,7 @@ pub async fn op_user_worker_create(
) -> Result<String, AnyError> {
let op_state = state.borrow();
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
let (result_tx, result_rx) = oneshot::channel::<CreateUserWorkerResult>();
let (result_tx, result_rx) = oneshot::channel::<Result<CreateUserWorkerResult, Error>>();

let mut env_vars = HashMap::new();
for (key, value) in env_vars_vec {
Expand All @@ -79,8 +81,15 @@ pub async fn op_user_worker_create(
));
}

// channel returns a Result<T, E>, we need to unwrap it first;
let result = result.unwrap();
Ok(result.key)
if result.is_err() {
return Err(custom_error(
"create_user_worker_error",
result.unwrap_err().to_string(),
));
}
Ok(result.unwrap().key.to_string())
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -285,7 +294,8 @@ pub async fn op_user_worker_fetch_send(
.ok()
.expect("multiple op_user_worker_fetch_send ongoing");
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx));
let uuid = Uuid::parse_str(key.as_str())?;
tx.send(UserWorkerMsgs::SendRequest(uuid, request.0, result_tx));

let result = result_rx.await;
if result.is_err() {
Expand Down
44 changes: 29 additions & 15 deletions base/src/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::js_worker::{MainWorker, UserWorker};

use anyhow::Error;
use anyhow::{bail, Error};
use hyper::{Body, Request, Response};
use log::{debug, error};
use std::collections::HashMap;
Expand All @@ -11,7 +11,9 @@ use std::thread;
use tokio::net::UnixStream;
use tokio::sync::RwLock;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

#[derive(Debug)]
pub struct WorkerContext {
handle: thread::JoinHandle<Result<(), Error>>,
request_sender: hyper::client::conn::SendRequest<Body>,
Expand Down Expand Up @@ -41,6 +43,10 @@ impl WorkerContext {
let import_map_path = options.import_map_path;
let user_worker_msgs_tx = options.user_worker_msgs_tx;

if (!service_path.exists()) {
return bail!("main function does not exist {:?}", &service_path);
}

// create a unix socket pair
let (sender_stream, recv_stream) = UnixStream::pair()?;

Expand Down Expand Up @@ -90,6 +96,10 @@ impl WorkerContext {
let import_map_path = options.import_map_path;
let env_vars = options.env_vars;

if (!service_path.exists()) {
return bail!("user function does not exist {:?}", &service_path);
}

// create a unix socket pair
let (sender_stream, recv_stream) = UnixStream::pair()?;

Expand Down Expand Up @@ -141,13 +151,16 @@ impl WorkerContext {

#[derive(Debug)]
pub struct CreateUserWorkerResult {
pub key: String,
pub key: Uuid,
}

#[derive(Debug)]
pub enum UserWorkerMsgs {
Create(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
SendRequest(String, Request<Body>, oneshot::Sender<Response<Body>>),
Create(
UserWorkerOptions,
oneshot::Sender<Result<CreateUserWorkerResult, Error>>,
),
SendRequest(Uuid, Request<Body>, oneshot::Sender<Response<Body>>),
}

pub struct WorkerPool {
Expand All @@ -174,23 +187,24 @@ impl WorkerPool {
let main_worker = Arc::new(RwLock::new(main_worker_ctx));

tokio::spawn(async move {
let mut user_workers: HashMap<String, Arc<RwLock<WorkerContext>>> = HashMap::new();
let mut user_workers: HashMap<Uuid, Arc<RwLock<WorkerContext>>> = HashMap::new();

loop {
match user_worker_msgs_rx.recv().await {
None => break,
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
let key = worker_options.service_path.display().to_string();
if !user_workers.contains_key(&key) {
// TODO: handle errors
let user_worker_ctx = WorkerContext::new_user_worker(worker_options)
.await
.unwrap();
user_workers
.insert(key.clone(), Arc::new(RwLock::new(user_worker_ctx)));
let key = Uuid::new_v4();
let user_worker_ctx = WorkerContext::new_user_worker(worker_options).await;
if !user_worker_ctx.is_err() {
user_workers.insert(
key.clone(),
Arc::new(RwLock::new(user_worker_ctx.unwrap())),
);

tx.send(Ok(CreateUserWorkerResult { key }));
} else {
tx.send(Err(user_worker_ctx.unwrap_err()));
}

tx.send(CreateUserWorkerResult { key });
}
Some(UserWorkerMsgs::SendRequest(key, req, tx)) => {
// TODO: handle errors
Expand Down
26 changes: 17 additions & 9 deletions examples/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,21 @@ serve(async (req: Request) => {
const no_module_cache = false;
const import_map_path = null;
const env_vars = [];
const worker = await EdgeRuntime.userWorkers.create({
service_path,
memory_limit_mb,
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars
});
return worker.fetch(req);
try {
const worker = await EdgeRuntime.userWorkers.create({
service_path,
memory_limit_mb,
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars
});
return worker.fetch(req);
} catch (e) {
const error = { msg: e.toString() }
return new Response(
JSON.stringify(error),
{ status: 500, headers: { "Content-Type": "application/json" } },
)
}
})