Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions base/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,20 @@ use crate::server::Server;
use anyhow::Error;

#[tokio::main]
pub async fn start_server(ip: &str, port: u16, main_service_path: String) -> Result<(), Error> {
let mut server = Server::new(ip, port, main_service_path).await?;
pub async fn start_server(
ip: &str,
port: u16,
main_service_path: String,
import_map_path: Option<String>,
no_module_cache: bool,
) -> Result<(), Error> {
let mut server = Server::new(
ip,
port,
main_service_path,
import_map_path,
no_module_cache,
)
.await?;
server.listen().await
}
8 changes: 4 additions & 4 deletions base/src/js_worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::utils::units::{bytes_to_display, human_elapsed, mib_to_bytes};
use crate::worker_ctx::WorkerPoolMsg;
use crate::worker_ctx::UserWorkerMsgs;

use anyhow::{bail, Error};
use deno_core::located_script_name;
Expand Down Expand Up @@ -66,15 +66,15 @@ fn print_import_map_diagnostics(diagnostics: &[ImportMapDiagnostic]) {
pub struct MainWorker {
js_runtime: JsRuntime,
main_module_url: ModuleSpecifier,
worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
worker_pool_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
}

impl MainWorker {
pub fn new(
service_path: PathBuf,
no_module_cache: bool,
import_map_path: Option<String>,
worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
worker_pool_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
) -> Result<Self, Error> {
// Note: MainWorker
// - does not have memory or worker timeout [x]
Expand Down Expand Up @@ -179,7 +179,7 @@ impl MainWorker {
let op_state_rc = self.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
op_state.put::<mpsc::UnboundedReceiver<UnixStream>>(unix_stream_rx);
op_state.put::<mpsc::UnboundedSender<WorkerPoolMsg>>(worker_pool_tx);
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(worker_pool_tx);
op_state.put::<types::EnvVars>(env_vars);
}

Expand Down
13 changes: 5 additions & 8 deletions base/src/js_worker/user_workers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerOptions, WorkerPoolMsg};
use crate::worker_ctx::{CreateUserWorkerResult, UserWorkerMsgs, UserWorkerOptions};

use deno_core::error::{custom_error, type_error, AnyError};
use deno_core::futures::stream::Peekable;
Expand Down Expand Up @@ -49,7 +49,7 @@ pub async fn op_user_worker_create(
env_vars_vec: Vec<(String, String)>,
) -> Result<String, AnyError> {
let op_state = state.borrow();
let tx = op_state.borrow::<mpsc::UnboundedSender<WorkerPoolMsg>>();
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
let (result_tx, result_rx) = oneshot::channel::<CreateUserWorkerResult>();

let mut env_vars = HashMap::new();
Expand All @@ -65,10 +65,7 @@ pub async fn op_user_worker_create(
import_map_path,
env_vars,
};
tx.send(WorkerPoolMsg::CreateUserWorker(
user_worker_options,
result_tx,
));
tx.send(UserWorkerMsgs::Create(user_worker_options, result_tx));

let result = result_rx.await;
if result.is_err() {
Expand Down Expand Up @@ -168,7 +165,7 @@ pub async fn op_user_worker_fetch(
req: UserWorkerRequest,
) -> Result<UserWorkerResponse, AnyError> {
let mut op_state = state.borrow_mut();
let tx = op_state.borrow::<mpsc::UnboundedSender<WorkerPoolMsg>>();
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();

let mut body = Body::empty();
Expand Down Expand Up @@ -197,7 +194,7 @@ pub async fn op_user_worker_fetch(
}
}

tx.send(WorkerPoolMsg::SendRequestToWorker(key, request, result_tx));
tx.send(UserWorkerMsgs::SendRequest(key, request, result_tx));

let result = result_rx.await;
if result.is_err() {
Expand Down
13 changes: 9 additions & 4 deletions base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ use std::future::Future;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::Path;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::task::Poll;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use url::Url;

struct WorkerService {
worker_ctx: Arc<RwLock<WorkerContext>>,
Expand Down Expand Up @@ -63,9 +61,16 @@ pub struct Server {
}

impl Server {
pub async fn new(ip: &str, port: u16, main_service_path: String) -> Result<Self, Error> {
pub async fn new(
ip: &str,
port: u16,
main_service_path: String,
import_map_path: Option<String>,
no_module_cache: bool,
) -> Result<Self, Error> {
// create a worker pool
let worker_pool = WorkerPool::new(main_service_path, None, false).await?;
let worker_pool =
WorkerPool::new(main_service_path, import_map_path, no_module_cache).await?;

let ip = Ipv4Addr::from_str(ip)?;
Ok(Self {
Expand Down
23 changes: 12 additions & 11 deletions base/src/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct UserWorkerOptions {

pub struct MainWorkerOptions {
pub service_path: PathBuf,
pub worker_pool_tx: mpsc::UnboundedSender<WorkerPoolMsg>,
pub user_worker_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
pub no_module_cache: bool,
pub import_map_path: Option<String>,
}
Expand All @@ -39,7 +39,7 @@ impl WorkerContext {
let service_path = options.service_path;
let no_module_cache = options.no_module_cache;
let import_map_path = options.import_map_path;
let worker_pool_tx = options.worker_pool_tx;
let user_worker_msgs_tx = options.user_worker_msgs_tx;

// create a unix socket pair
let (sender_stream, recv_stream) = UnixStream::pair()?;
Expand All @@ -49,7 +49,7 @@ impl WorkerContext {
service_path.clone(),
no_module_cache,
import_map_path,
worker_pool_tx.clone(),
user_worker_msgs_tx.clone(),
)?;

// start the worker
Expand Down Expand Up @@ -145,9 +145,9 @@ pub struct CreateUserWorkerResult {
}

#[derive(Debug)]
pub enum WorkerPoolMsg {
CreateUserWorker(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
SendRequestToWorker(String, Request<Body>, oneshot::Sender<Response<Body>>),
pub enum UserWorkerMsgs {
Create(UserWorkerOptions, oneshot::Sender<CreateUserWorkerResult>),
SendRequest(String, Request<Body>, oneshot::Sender<Response<Body>>),
}

pub struct WorkerPool {
Expand All @@ -160,14 +160,15 @@ impl WorkerPool {
import_map_path: Option<String>,
no_module_cache: bool,
) -> Result<Self, Error> {
let (worker_pool_tx, mut worker_pool_rx) = mpsc::unbounded_channel::<WorkerPoolMsg>();
let (user_worker_msgs_tx, mut user_worker_msgs_rx) =
mpsc::unbounded_channel::<UserWorkerMsgs>();

let main_path = Path::new(&main_path);
let main_worker_ctx = WorkerContext::new_main_worker(MainWorkerOptions {
service_path: main_path.to_path_buf(),
import_map_path,
no_module_cache,
worker_pool_tx,
user_worker_msgs_tx,
})
.await?;
let main_worker = Arc::new(RwLock::new(main_worker_ctx));
Expand All @@ -176,9 +177,9 @@ impl WorkerPool {
let mut user_workers: HashMap<String, Arc<RwLock<WorkerContext>>> = HashMap::new();

loop {
match worker_pool_rx.recv().await {
match user_worker_msgs_rx.recv().await {
None => break,
Some(WorkerPoolMsg::CreateUserWorker(worker_options, tx)) => {
Some(UserWorkerMsgs::Create(worker_options, tx)) => {
let key = worker_options.service_path.display().to_string();
if !user_workers.contains_key(&key) {
// TODO: handle errors
Expand All @@ -191,7 +192,7 @@ impl WorkerPool {

tx.send(CreateUserWorkerResult { key });
}
Some(WorkerPoolMsg::SendRequestToWorker(key, req, tx)) => {
Some(UserWorkerMsgs::SendRequest(key, req, tx)) => {
// TODO: handle errors
let worker = user_workers.get(&key).unwrap();
let mut worker = worker.write().await;
Expand Down
12 changes: 8 additions & 4 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use anyhow::Error;
use base::commands::start_server;
use clap::builder::FalseyValueParser;
use clap::{arg, value_parser, ArgAction, Command};
use std::env;

fn cli() -> Command {
Command::new("edge-runtime")
.about("A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services")
Expand Down Expand Up @@ -67,13 +65,19 @@ fn main() {
.get_one::<String>("main-service")
.cloned()
.unwrap();
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();
let no_module_cache = sub_matches
.get_one::<bool>("disable-module-cache")
.cloned()
.unwrap();
let import_map_path = sub_matches.get_one::<String>("import-map").cloned();

exit_with_code(start_server(&ip.as_str(), port, main_service_path))
exit_with_code(start_server(
&ip.as_str(),
port,
main_service_path,
import_map_path,
no_module_cache,
))
}
_ => {
// unrecognized command
Expand Down