Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ docker build -t edge-runtime .
docker run -it --rm -p 9000:9000 -v /path/to/supabase/functions:/usr/services supabase/edge-runtime start --dir /usr/services
```

## Architecture

Server -> Base Worker -> User Function

## TODO

* Support import maps
* Check verify-jwt
* better error messages for incorrect module loading paths (local)
* Workers should only have access to env variables assigned to it
* handle 404 errors
* better error messages for incorrect module loading paths
* better error messages for invalid import map paths
* Support snapshotting the runtime
* Support for private modules (DENO_AUTH_TOKENS)
* HTTP/2 support (need the host to support SSL)
Expand Down
3 changes: 3 additions & 0 deletions base/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::server::Server;
use anyhow::Error;
use std::collections::HashMap;

#[tokio::main]
pub async fn start_server(
Expand All @@ -10,6 +11,7 @@ pub async fn start_server(
service_timeout: u16,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
) -> Result<(), Error> {
let server = Server::new(
ip,
Expand All @@ -19,6 +21,7 @@ pub async fn start_server(
service_timeout,
no_module_cache,
import_map_path,
env_vars,
)?;
server.listen().await
}
35 changes: 31 additions & 4 deletions base/src/js_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use deno_core::JsRuntime;
use deno_core::RuntimeOptions;
use import_map::{parse_from_json, ImportMap, ImportMapDiagnostic};
use log::{debug, error, warn};
use std::collections::HashMap;
use std::fs;
use std::panic;
use std::path::Path;
Expand All @@ -24,6 +25,7 @@ pub mod module_loader;
pub mod net_override;
pub mod permissions;
pub mod runtime;
pub mod types;

use module_loader::DefaultModuleLoader;
use permissions::Permissions;
Expand All @@ -34,6 +36,7 @@ pub async fn serve(
worker_timeout_ms: u64,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
tcp_stream: TcpStream,
) -> Result<(), Error> {
let service_path_clone = service_path.clone();
Expand All @@ -50,11 +53,13 @@ pub async fn serve(
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars,
tcp_stream_rx,
shutdown_tx,
)
});

// send the tcp stram to the worker
tcp_stream_tx.send(tcp_stream)?;

debug!("js worker for {} started", service_name);
Expand Down Expand Up @@ -94,12 +99,14 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
);
}
}

fn start_runtime(
service_path: PathBuf,
memory_limit_mb: u64,
worker_timeout_ms: u64,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
tcp_stream_rx: mpsc::UnboundedReceiver<TcpStream>,
shutdown_tx: oneshot::Sender<()>,
) {
Expand All @@ -114,7 +121,7 @@ fn start_runtime(
// TODO: check for other potential main paths (eg: index.js, index.tsx)
let main_module_url = base_url.join("index.ts").unwrap();

// Note: this will load Mozilla's CAs (we may also need to support
// Note: this will load Mozilla's CAs (we may also need to support system certs)
let root_cert_store = deno_tls::create_default_root_cert_store();

let extensions_with_js = vec![
Expand Down Expand Up @@ -165,8 +172,8 @@ fn start_runtime(
..Default::default()
});

let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle();
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<u64>();
let (halt_execution_tx, mut halt_execution_rx) = oneshot::channel::<()>();

// add a callback when a worker reaches its memory limit
js_runtime.add_near_heap_limit_callback(move |cur, _init| {
Expand Down Expand Up @@ -199,9 +206,16 @@ fn start_runtime(
let op_state_rc = js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
op_state.put::<mpsc::UnboundedReceiver<TcpStream>>(tcp_stream_rx);
op_state.put::<types::EnvVars>(env_vars);
}

start_controller_thread(v8_thread_safe_handle, worker_timeout_ms, memory_limit_rx);
let v8_thread_safe_handle = js_runtime.v8_isolate().thread_safe_handle();
start_controller_thread(
v8_thread_safe_handle,
worker_timeout_ms,
memory_limit_rx,
halt_execution_tx,
);

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -211,7 +225,15 @@ fn start_runtime(
let future = async move {
let mod_id = js_runtime.load_main_module(&main_module_url, None).await?;
let result = js_runtime.mod_evaluate(mod_id);
js_runtime.run_event_loop(false).await?;

tokio::select! {
_ = js_runtime.run_event_loop(false) => {
debug!("event loop completed");
}
_ = &mut halt_execution_rx => {
debug!("worker exectution halted");
}
}

result.await?
};
Expand All @@ -230,6 +252,7 @@ fn start_controller_thread(
v8_thread_safe_handle: v8::IsolateHandle,
worker_timeout_ms: u64,
mut memory_limit_rx: mpsc::UnboundedReceiver<u64>,
halt_execution_tx: oneshot::Sender<()>,
) {
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -255,5 +278,9 @@ fn start_controller_thread(
} else {
debug!("worker already terminated");
}

halt_execution_tx
.send(())
.expect("failed to send halt execution signal");
});
}
43 changes: 10 additions & 33 deletions base/src/js_worker/env.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::js_worker::permissions::Permissions;
use crate::js_worker::types::EnvVars;

use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::error::{not_supported, type_error};
use deno_core::include_js_files;
use deno_core::op;
use deno_core::Extension;
use deno_core::OpState;
use deno_node::NODE_ENV_VAR_ALLOWLIST;
use std::collections::HashMap;
use std::env;

pub fn init() -> Extension {
Extension::builder("custom:env")
Expand All @@ -26,31 +26,15 @@ pub fn init() -> Extension {
}

#[op]
fn op_set_env(state: &mut OpState, key: String, value: String) -> Result<(), AnyError> {
state.borrow_mut::<Permissions>().check_env(&key)?;
if key.is_empty() {
return Err(type_error("Key is an empty string."));
}
if key.contains(&['=', '\0'] as &[char]) {
return Err(type_error(format!(
"Key contains invalid characters: {:?}",
key
)));
}
if value.contains('\0') {
return Err(type_error(format!(
"Value contains invalid characters: {:?}",
value
)));
}
env::set_var(key, value);
Ok(())
fn op_set_env(_state: &mut OpState, _key: String, _value: String) -> Result<(), AnyError> {
Err(not_supported())
}

#[op]
fn op_env(state: &mut OpState) -> Result<HashMap<String, String>, AnyError> {
state.borrow_mut::<Permissions>().check_env_all()?;
Ok(env::vars().collect())
let env_vars = state.borrow::<EnvVars>();
Ok(env_vars.clone())
}

#[op]
Expand All @@ -72,19 +56,12 @@ fn op_get_env(state: &mut OpState, key: String) -> Result<Option<String>, AnyErr
)));
}

let r = match env::var(key) {
Err(env::VarError::NotPresent) => None,
v => Some(v?),
};
let env_vars = state.borrow::<EnvVars>();
let r = env_vars.get(&key).map(|k| k.clone());
Ok(r)
}

#[op]
fn op_delete_env(state: &mut OpState, key: String) -> Result<(), AnyError> {
state.borrow_mut::<Permissions>().check_env(&key)?;
if key.is_empty() || key.contains(&['=', '\0'] as &[char]) {
return Err(type_error("Key contains invalid characters."));
}
env::remove_var(key);
Ok(())
fn op_delete_env(_state: &mut OpState, _key: String) -> Result<(), AnyError> {
Err(not_supported())
}
18 changes: 16 additions & 2 deletions base/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::js_worker;
use anyhow::{bail, Error};
use log::{debug, error, info};
use std::collections::HashMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
Expand All @@ -17,6 +18,7 @@ async fn process_stream(
service_timeout: u16,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
) -> Result<(), Error> {
// peek into the HTTP header
// find request path
Expand All @@ -30,7 +32,7 @@ async fn process_stream(

let mut headers = [httparse::EMPTY_HEADER; 64];
let mut req = httparse::Request::new(&mut headers);
let _ = req.parse(&buf).unwrap();
let _ = req.parse(&buf)?;

if req.path.is_none() {
// if the path isn't found in first 1024 bytes it must not be a valid http
Expand Down Expand Up @@ -75,6 +77,7 @@ async fn process_stream(
worker_timeout_ms,
no_module_cache,
import_map_path,
env_vars,
stream,
)
.await?;
Expand All @@ -90,6 +93,7 @@ pub struct Server {
service_timeout: u16,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
}

impl Server {
Expand All @@ -101,6 +105,7 @@ impl Server {
service_timeout: u16,
no_module_cache: bool,
import_map_path: Option<String>,
env_vars: HashMap<String, String>,
) -> Result<Self, Error> {
let ip = Ipv4Addr::from_str(ip)?;
Ok(Self {
Expand All @@ -111,6 +116,7 @@ impl Server {
service_timeout,
no_module_cache,
import_map_path,
env_vars,
})
}

Expand All @@ -129,8 +135,16 @@ impl Server {
let service_timeout = self.service_timeout;
let no_module_cache = self.no_module_cache;
let import_map_path_clone = self.import_map_path.clone();
let env_vars_clone = self.env_vars.clone();
tokio::task::spawn(async move {
let res = process_stream(stream, services_dir_clone, mem_limit, service_timeout, no_module_cache, import_map_path_clone).await;
let res = process_stream(
stream,
services_dir_clone,
mem_limit,
service_timeout,
no_module_cache,
import_map_path_clone,
env_vars_clone).await;
if res.is_err() {
error!("{:?}", res.err().unwrap());
}
Expand Down
4 changes: 4 additions & 0 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Error;
use base::commands::start_server;
use clap::builder::FalseyValueParser;
use clap::{arg, value_parser, ArgAction, Command};
use std::env;

fn cli() -> Command {
Command::new("edge-runtime")
Expand Down Expand Up @@ -74,6 +75,8 @@ fn main() {
.cloned()
.unwrap();
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();
// CLI will provide all OS environment variables to a function
let env_vars = env::vars().collect();

exit_with_code(start_server(
&ip.as_str(),
Expand All @@ -83,6 +86,7 @@ fn main() {
service_timeout,
no_module_cache,
import_map_path,
env_vars,
))
}
_ => {
Expand Down