From 2af4023a68ba145c374e4ced19417ca44d8047fb Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Wed, 1 Mar 2023 11:11:01 +1100 Subject: [PATCH 1/4] fix: send a signal to halt the worker when memory or time limit reached --- README.md | 8 ++++++-- base/src/js_worker.rs | 27 ++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index fb8b54af5..9cca241bc 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,16 @@ docker build -t edge-runtime . docker run -it --rm -p 9000:9000 -v /path/to/supabase/functions:/usr/services supabase/edge-runtime start --dir /usr/services ``` +## Architecture + +Server -> Base Worker -> User Function + ## TODO -* Support import maps * Check verify-jwt -* better error messages for incorrect module loading paths (local) * handle 404 errors +* better error messages for incorrect module loading paths +* better error messages for invalid import map paths * Support snapshotting the runtime * Support for private modules (DENO_AUTH_TOKENS) * HTTP/2 support (need the host to support SSL) diff --git a/base/src/js_worker.rs b/base/src/js_worker.rs index 5398bb75b..6e92c767f 100644 --- a/base/src/js_worker.rs +++ b/base/src/js_worker.rs @@ -55,6 +55,7 @@ pub async fn serve( ) }); + // send the tcp stram to the worker tcp_stream_tx.send(tcp_stream)?; debug!("js worker for {} started", service_name); @@ -94,6 +95,7 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) { ); } } + fn start_runtime( service_path: PathBuf, memory_limit_mb: u64, @@ -165,8 +167,8 @@ fn start_runtime( ..Default::default() }); - let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle(); let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::(); + let (halt_execution_tx, mut halt_execution_rx) = oneshot::channel::<()>(); // add a callback when a worker reaches its memory limit js_runtime.add_near_heap_limit_callback(move |cur, _init| { @@ -201,7 +203,13 @@ fn start_runtime( op_state.put::>(tcp_stream_rx); } - start_controller_thread(v8_thread_safe_handle, worker_timeout_ms, memory_limit_rx); + let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle(); + start_controller_thread( + v8_thread_safe_handle, + worker_timeout_ms, + memory_limit_rx, + halt_execution_tx, + ); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -211,7 +219,15 @@ fn start_runtime( let future = async move { let mod_id = js_runtime.load_main_module(&main_module_url, None).await?; let result = js_runtime.mod_evaluate(mod_id); - js_runtime.run_event_loop(false).await?; + + tokio::select! { + _ = js_runtime.run_event_loop(false) => { + debug!("event loop completed"); + } + _ = &mut halt_execution_rx => { + debug!("worker exectution halted"); + } + } result.await? }; @@ -230,6 +246,7 @@ fn start_controller_thread( v8_thread_safe_handle: v8::IsolateHandle, worker_timeout_ms: u64, mut memory_limit_rx: mpsc::UnboundedReceiver, + halt_execution_tx: oneshot::Sender<()>, ) { thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() @@ -255,5 +272,9 @@ fn start_controller_thread( } else { debug!("worker already terminated"); } + + halt_execution_tx + .send(()) + .expect("failed to send halt execution signal"); }); } From 4c5b9b974e338bab941c2e2f549c17c5e8cc4d8f Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Wed, 1 Mar 2023 16:04:53 +1100 Subject: [PATCH 2/4] fix: do not allow set and delete env API calls --- README.md | 1 + base/src/js_worker.rs | 2 +- base/src/js_worker/env.rs | 32 +++++--------------------------- 3 files changed, 7 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9cca241bc..79f1b3718 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Server -> Base Worker -> User Function ## TODO * Check verify-jwt +* Workers should only have access to env variables assigned to it * handle 404 errors * better error messages for incorrect module loading paths * better error messages for invalid import map paths diff --git a/base/src/js_worker.rs b/base/src/js_worker.rs index 6e92c767f..b52923399 100644 --- a/base/src/js_worker.rs +++ b/base/src/js_worker.rs @@ -116,7 +116,7 @@ fn start_runtime( // TODO: check for other potential main paths (eg: index.js, index.tsx) let main_module_url = base_url.join("index.ts").unwrap(); - // Note: this will load Mozilla's CAs (we may also need to support + // Note: this will load Mozilla's CAs (we may also need to support system certs) let root_cert_store = deno_tls::create_default_root_cert_store(); let extensions_with_js = vec![ diff --git a/base/src/js_worker/env.rs b/base/src/js_worker/env.rs index 484d70b2a..b8ab090ef 100644 --- a/base/src/js_worker/env.rs +++ b/base/src/js_worker/env.rs @@ -1,7 +1,7 @@ use crate::js_worker::permissions::Permissions; -use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::error::{not_supported, type_error}; use deno_core::include_js_files; use deno_core::op; use deno_core::Extension; @@ -26,25 +26,8 @@ pub fn init() -> Extension { } #[op] -fn op_set_env(state: &mut OpState, key: String, value: String) -> Result<(), AnyError> { - state.borrow_mut::().check_env(&key)?; - if key.is_empty() { - return Err(type_error("Key is an empty string.")); - } - if key.contains(&['=', '\0'] as &[char]) { - return Err(type_error(format!( - "Key contains invalid characters: {:?}", - key - ))); - } - if value.contains('\0') { - return Err(type_error(format!( - "Value contains invalid characters: {:?}", - value - ))); - } - env::set_var(key, value); - Ok(()) +fn op_set_env(_state: &mut OpState, _key: String, _value: String) -> Result<(), AnyError> { + Err(not_supported()) } #[op] @@ -80,11 +63,6 @@ fn op_get_env(state: &mut OpState, key: String) -> Result, AnyErr } #[op] -fn op_delete_env(state: &mut OpState, key: String) -> Result<(), AnyError> { - state.borrow_mut::().check_env(&key)?; - if key.is_empty() || key.contains(&['=', '\0'] as &[char]) { - return Err(type_error("Key contains invalid characters.")); - } - env::remove_var(key); - Ok(()) +fn op_delete_env(_state: &mut OpState, _key: String) -> Result<(), AnyError> { + Err(not_supported()) } From 69bf544e401852fa7c3b2a95aa6a90a27d2f0899 Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Wed, 1 Mar 2023 16:10:01 +1100 Subject: [PATCH 3/4] fix: request parsing error --- base/src/server.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/base/src/server.rs b/base/src/server.rs index b7af1c56b..78e553a2f 100644 --- a/base/src/server.rs +++ b/base/src/server.rs @@ -30,7 +30,7 @@ async fn process_stream( let mut headers = [httparse::EMPTY_HEADER; 64]; let mut req = httparse::Request::new(&mut headers); - let _ = req.parse(&buf).unwrap(); + let _ = req.parse(&buf)?; if req.path.is_none() { // if the path isn't found in first 1024 bytes it must not be a valid http @@ -130,7 +130,13 @@ impl Server { let no_module_cache = self.no_module_cache; let import_map_path_clone = self.import_map_path.clone(); tokio::task::spawn(async move { - let res = process_stream(stream, services_dir_clone, mem_limit, service_timeout, no_module_cache, import_map_path_clone).await; + let res = process_stream( + stream, + services_dir_clone, + mem_limit, + service_timeout, + no_module_cache, + import_map_path_clone).await; if res.is_err() { error!("{:?}", res.err().unwrap()); } From 2fb6d08879c3354e5d0b913172e48bc96ef753c7 Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Thu, 2 Mar 2023 01:18:40 +1100 Subject: [PATCH 4/4] fix: refactor how env vars are passed to a worker --- base/src/commands.rs | 3 +++ base/src/js_worker.rs | 6 ++++++ base/src/js_worker/env.rs | 11 +++++------ base/src/server.rs | 10 +++++++++- cli/src/main.rs | 4 ++++ 5 files changed, 27 insertions(+), 7 deletions(-) diff --git a/base/src/commands.rs b/base/src/commands.rs index fac6d7d20..0c009995b 100644 --- a/base/src/commands.rs +++ b/base/src/commands.rs @@ -1,5 +1,6 @@ use crate::server::Server; use anyhow::Error; +use std::collections::HashMap; #[tokio::main] pub async fn start_server( @@ -10,6 +11,7 @@ pub async fn start_server( service_timeout: u16, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, ) -> Result<(), Error> { let server = Server::new( ip, @@ -19,6 +21,7 @@ pub async fn start_server( service_timeout, no_module_cache, import_map_path, + env_vars, )?; server.listen().await } diff --git a/base/src/js_worker.rs b/base/src/js_worker.rs index b52923399..bdb891603 100644 --- a/base/src/js_worker.rs +++ b/base/src/js_worker.rs @@ -7,6 +7,7 @@ use deno_core::JsRuntime; use deno_core::RuntimeOptions; use import_map::{parse_from_json, ImportMap, ImportMapDiagnostic}; use log::{debug, error, warn}; +use std::collections::HashMap; use std::fs; use std::panic; use std::path::Path; @@ -24,6 +25,7 @@ pub mod module_loader; pub mod net_override; pub mod permissions; pub mod runtime; +pub mod types; use module_loader::DefaultModuleLoader; use permissions::Permissions; @@ -34,6 +36,7 @@ pub async fn serve( worker_timeout_ms: u64, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, tcp_stream: TcpStream, ) -> Result<(), Error> { let service_path_clone = service_path.clone(); @@ -50,6 +53,7 @@ pub async fn serve( worker_timeout_ms, no_module_cache, import_map_path, + env_vars, tcp_stream_rx, shutdown_tx, ) @@ -102,6 +106,7 @@ fn start_runtime( worker_timeout_ms: u64, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, tcp_stream_rx: mpsc::UnboundedReceiver, shutdown_tx: oneshot::Sender<()>, ) { @@ -201,6 +206,7 @@ fn start_runtime( let op_state_rc = js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); op_state.put::>(tcp_stream_rx); + op_state.put::(env_vars); } let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle(); diff --git a/base/src/js_worker/env.rs b/base/src/js_worker/env.rs index b8ab090ef..2f92f4d1b 100644 --- a/base/src/js_worker/env.rs +++ b/base/src/js_worker/env.rs @@ -1,4 +1,5 @@ use crate::js_worker::permissions::Permissions; +use crate::js_worker::types::EnvVars; use deno_core::error::AnyError; use deno_core::error::{not_supported, type_error}; @@ -8,7 +9,6 @@ use deno_core::Extension; use deno_core::OpState; use deno_node::NODE_ENV_VAR_ALLOWLIST; use std::collections::HashMap; -use std::env; pub fn init() -> Extension { Extension::builder("custom:env") @@ -33,7 +33,8 @@ fn op_set_env(_state: &mut OpState, _key: String, _value: String) -> Result<(), #[op] fn op_env(state: &mut OpState) -> Result, AnyError> { state.borrow_mut::().check_env_all()?; - Ok(env::vars().collect()) + let env_vars = state.borrow::(); + Ok(env_vars.clone()) } #[op] @@ -55,10 +56,8 @@ fn op_get_env(state: &mut OpState, key: String) -> Result, AnyErr ))); } - let r = match env::var(key) { - Err(env::VarError::NotPresent) => None, - v => Some(v?), - }; + let env_vars = state.borrow::(); + let r = env_vars.get(&key).map(|k| k.clone()); Ok(r) } diff --git a/base/src/server.rs b/base/src/server.rs index 78e553a2f..91f3656bf 100644 --- a/base/src/server.rs +++ b/base/src/server.rs @@ -1,6 +1,7 @@ use crate::js_worker; use anyhow::{bail, Error}; use log::{debug, error, info}; +use std::collections::HashMap; use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::SocketAddr; @@ -17,6 +18,7 @@ async fn process_stream( service_timeout: u16, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, ) -> Result<(), Error> { // peek into the HTTP header // find request path @@ -75,6 +77,7 @@ async fn process_stream( worker_timeout_ms, no_module_cache, import_map_path, + env_vars, stream, ) .await?; @@ -90,6 +93,7 @@ pub struct Server { service_timeout: u16, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, } impl Server { @@ -101,6 +105,7 @@ impl Server { service_timeout: u16, no_module_cache: bool, import_map_path: Option, + env_vars: HashMap, ) -> Result { let ip = Ipv4Addr::from_str(ip)?; Ok(Self { @@ -111,6 +116,7 @@ impl Server { service_timeout, no_module_cache, import_map_path, + env_vars, }) } @@ -129,6 +135,7 @@ impl Server { let service_timeout = self.service_timeout; let no_module_cache = self.no_module_cache; let import_map_path_clone = self.import_map_path.clone(); + let env_vars_clone = self.env_vars.clone(); tokio::task::spawn(async move { let res = process_stream( stream, @@ -136,7 +143,8 @@ impl Server { mem_limit, service_timeout, no_module_cache, - import_map_path_clone).await; + import_map_path_clone, + env_vars_clone).await; if res.is_err() { error!("{:?}", res.err().unwrap()); } diff --git a/cli/src/main.rs b/cli/src/main.rs index 8db4fb49f..23677e1c0 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -4,6 +4,7 @@ use anyhow::Error; use base::commands::start_server; use clap::builder::FalseyValueParser; use clap::{arg, value_parser, ArgAction, Command}; +use std::env; fn cli() -> Command { Command::new("edge-runtime") @@ -74,6 +75,8 @@ fn main() { .cloned() .unwrap(); let import_map_path = sub_matches.get_one::("import-map").cloned(); + // CLI will provide all OS environment variables to a function + let env_vars = env::vars().collect(); exit_with_code(start_server( &ip.as_str(), @@ -83,6 +86,7 @@ fn main() { service_timeout, no_module_cache, import_map_path, + env_vars, )) } _ => {