Skip to content
Permalink
Browse files

Merge Worker and Isolate types (denoland#2078)

Reduces generics.
  • Loading branch information...
ry committed Apr 8, 2019
1 parent 734cf78 commit 2debbdacb935cfe1eb7bb8d1f40a5063b339d90b
Showing with 359 additions and 631 deletions.
  1. +1 −13 cli/cli_behavior.rs
  2. +57 −114 cli/compiler.rs
  3. +16 −22 cli/isolate_state.rs
  4. +9 −9 cli/main.rs
  5. +141 −158 cli/ops.rs
  6. +10 −7 cli/resources.rs
  7. +121 −22 cli/{isolate.rs → worker.rs}
  8. +0 −284 cli/workers.rs
  9. +0 −1 js/compiler.ts
  10. +4 −1 js/workers.ts
@@ -17,24 +17,12 @@ impl CliBehavior {
}
}

impl IsolateStateContainer for &CliBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}

impl IsolateStateContainer for CliBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}

impl Behavior for CliBehavior {
fn dispatch(
&mut self,
control: &[u8],
zero_copy: deno_buf,
) -> (bool, Box<Op>) {
ops::dispatch_all(self, control, zero_copy, ops::op_selector_std)
ops::dispatch_all(&self.state, control, zero_copy, ops::op_selector_std)
}
}
@@ -1,22 +1,17 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::flags::DenoFlags;
use crate::cli_behavior::CliBehavior;
use crate::isolate_state::*;
use crate::js_errors;
use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::ops;
use crate::resources;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::tokio_util;
use crate::workers;
use crate::workers::WorkerBehavior;
use crate::workers::WorkerInit;
use deno::deno_buf;
use deno::Behavior;
use crate::worker::Worker;
use deno::js_check;
use deno::Buf;
use deno::JSError;
use deno::Op;
use futures::future::*;
use futures::sync::oneshot;
use futures::Future;
@@ -44,51 +39,6 @@ lazy_static! {
static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
}

pub struct CompilerBehavior {
pub state: Arc<IsolateState>,
}

impl CompilerBehavior {
pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
Self {
state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
}
}
}

impl IsolateStateContainer for CompilerBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}

impl IsolateStateContainer for &CompilerBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}

impl Behavior for CompilerBehavior {
fn dispatch(
&mut self,
control: &[u8],
zero_copy: deno_buf,
) -> (bool, Box<Op>) {
ops::dispatch_all(self, control, zero_copy, ops::op_selector_compiler)
}
}

impl WorkerBehavior for CompilerBehavior {
fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
self.state = Arc::new(IsolateState::new(
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
true,
));
}
}

// This corresponds to JS ModuleMetaData.
// TODO Rename one or the other so they correspond.
#[derive(Debug, Clone)]
@@ -142,74 +92,67 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> ResourceId {
let mut cell = C_RID.lock().unwrap();
cell
.get_or_insert_with(|| {
let worker_result = workers::spawn(
let child_state = Arc::new(IsolateState::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
));
let rid = child_state.resource.rid;
let resource = child_state.resource.clone();
let behavior = CliBehavior::new(child_state);

let mut worker = Worker::new(
"TS".to_string(),
startup_data::compiler_isolate_init(),
CompilerBehavior::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
),
"TS",
WorkerInit::Script("compilerMain()".to_string()),
behavior,
);
match worker_result {
Ok(worker) => {
let rid = worker.resource.rid;
let mut runtime = C_RUNTIME.lock().unwrap();
runtime.spawn(lazy(move || {
let resource = worker.resource.clone();
worker.then(move |result| -> Result<(), ()> {
// Close resource so the future created by
// handle_worker_message_stream exits
resource.close();
debug!("Compiler worker exited!");
if let Err(e) = result {
eprintln!("{}", JSErrorColor(&e).to_string());
}
std::process::exit(1);
})
}));
runtime.spawn(lazy(move || {
debug!("Start worker stream handler!");
let worker_stream = resources::get_message_stream_from_worker(rid);
worker_stream
.for_each(|msg: Buf| {
// All worker responses are handled here first before being sent via
// their respective sender. This system can be compared to the
// promise system used on the js side. This provides a way to
// resolve many futures via the same channel.
let res_json = std::str::from_utf8(&msg).unwrap();
debug!("Got message from worker: {}", res_json);
// Get the intended receiver's cmd_id from the message.
let cmd_id = parse_cmd_id(res_json);
let mut table = C_RES_SENDER_TABLE.lock().unwrap();
debug!("Cmd id for get message handler: {}", cmd_id);
// Get the corresponding response sender from the table and
// send a response.
let response_sender = table.remove(&(cmd_id as CmdId)).unwrap();
response_sender.send(msg).unwrap();
Ok(())
}).map_err(|_| ())
}));
rid
}
Err(err) => {
println!("{}", err.to_string());

js_check(worker.execute("denoMain()"));
js_check(worker.execute("workerMain()"));
js_check(worker.execute("compilerMain()"));

let mut runtime = C_RUNTIME.lock().unwrap();
runtime.spawn(lazy(move || {
worker.then(move |result| -> Result<(), ()> {
// Close resource so the future created by
// handle_worker_message_stream exits
resource.close();
debug!("Compiler worker exited!");
if let Err(e) = result {
eprintln!("{}", JSErrorColor(&e).to_string());
}
std::process::exit(1);
}
}
})
}));
runtime.spawn(lazy(move || {
debug!("Start worker stream handler!");
let worker_stream = resources::get_message_stream_from_worker(rid);
worker_stream
.for_each(|msg: Buf| {
// All worker responses are handled here first before being sent via
// their respective sender. This system can be compared to the
// promise system used on the js side. This provides a way to
// resolve many futures via the same channel.
let res_json = std::str::from_utf8(&msg).unwrap();
debug!("Got message from worker: {}", res_json);
// Get the intended receiver's cmd_id from the message.
let cmd_id = parse_cmd_id(res_json);
let mut table = C_RES_SENDER_TABLE.lock().unwrap();
debug!("Cmd id for get message handler: {}", cmd_id);
// Get the corresponding response sender from the table and
// send a response.
let response_sender = table.remove(&(cmd_id as CmdId)).unwrap();
response_sender.send(msg).unwrap();
Ok(())
}).map_err(|_| ())
}));
rid
}).clone()
}

fn req(
specifier: &str,
referrer: &str,
is_worker_main: bool,
cmd_id: u32,
) -> Buf {
fn req(specifier: &str, referrer: &str, cmd_id: u32) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
"isWorker": is_worker_main,
"cmdId": cmd_id,
}).to_string()
.into_boxed_str()
@@ -228,7 +171,7 @@ pub fn compile_async(
);
let cmd_id = new_cmd_id();

let req_msg = req(&specifier, &referrer, parent_state.is_worker, cmd_id);
let req_msg = req(&specifier, &referrer, cmd_id);
let module_meta_data_ = module_meta_data.clone();

let compiler_rid = lazy_start(parent_state.clone());
@@ -362,7 +305,7 @@ mod tests {
fn test_parse_cmd_id() {
let cmd_id = new_cmd_id();

let msg = req("Hello", "World", false, cmd_id);
let msg = req("Hello", "World", cmd_id);

let res_json = std::str::from_utf8(&msg).unwrap();

@@ -5,25 +5,23 @@ use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::modules::Modules;
use crate::permissions::DenoPermissions;
use crate::resources;
use crate::resources::ResourceId;
use crate::workers::UserWorkerBehavior;
use crate::workers::Worker;
use crate::worker::Worker;
use deno::Buf;
use futures::future::Shared;
use futures::sync::mpsc as async_mpsc;
use std;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc as async_mpsc;

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
pub type UserWorkerTable =
HashMap<ResourceId, Shared<Worker<UserWorkerBehavior>>>;
pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;

// AtomicU64 is currently unstable
#[derive(Default)]
@@ -48,34 +46,35 @@ pub struct IsolateState {
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub modules: Mutex<Modules>,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub worker_channels: Mutex<WorkerChannels>,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<UserWorkerTable>,
pub is_worker: bool,
pub start_time: Instant,
pub resource: resources::Resource,
}

impl IsolateState {
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
is_worker: bool,
) -> Self {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
let internal_channels = (worker_out_tx, worker_in_rx);
let external_channels = (worker_in_tx, worker_out_rx);
let resource = resources::add_worker(external_channels);

Self {
dir: deno_dir::DenoDir::new(custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::from_flags(&flags),
flags,
metrics: Metrics::default(),
modules: Mutex::new(Modules::new()),
worker_channels: worker_channels.map(Mutex::new),
worker_channels: Mutex::new(internal_channels),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(UserWorkerTable::new()),
is_worker,
start_time: Instant::now(),
resource,
}
}

@@ -126,7 +125,7 @@ impl IsolateState {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv) = flags::set_flags(argv).unwrap();
IsolateState::new(flags, rest_argv, None, false)
IsolateState::new(flags, rest_argv)
}

pub fn metrics_op_dispatched(
@@ -153,8 +152,3 @@ impl IsolateState {
.fetch_add(bytes_received, Ordering::SeqCst);
}
}

/// Provides state getter function
pub trait IsolateStateContainer {
fn state(&self) -> Arc<IsolateState>;
}
Oops, something went wrong.

0 comments on commit 2debbda

Please sign in to comment.
You can’t perform that action at this time.