Skip to content

Commit

Permalink
Give ops a name
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed Apr 16, 2019
1 parent e48659b commit c9ebe45
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 96 deletions.
163 changes: 93 additions & 70 deletions cli/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use deno::js_check;
use deno::Buf;
use deno::JSError;
use deno::Op;
use deno::Tag;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
Expand Down Expand Up @@ -64,7 +65,8 @@ type OpCreator =
fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf)
-> Box<OpWithError>;

pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
pub type OpSelector =
fn(inner_type: msg::Any) -> Option<(&'static str, OpCreator)>;

#[inline]
fn empty_buf() -> Buf {
Expand All @@ -88,18 +90,17 @@ pub fn dispatch_all(
let inner_type = base.inner_type();
let cmd_id = base.cmd_id();

let op_func: OpCreator = match op_selector(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
let (op_name, op_func) = op_selector(inner_type).unwrap_or_else(|| {
panic!("Unhandled message {}", msg::enum_name_any(inner_type))
});

let op: Box<OpWithError> = op_func(state, &base, zero_copy);
let op = op_func(&state, &base, zero_copy);

let state = state.clone();
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);

let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> Result<Buf, ()> {
let boxed_op = op
.or_else(move |err: DenoError| -> Result<Buf, ()> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a deno_buf.
Expand Down Expand Up @@ -132,8 +133,9 @@ pub fn dispatch_all(
};
state.metrics_op_completed(buf.len());
Ok(buf)
}).map_err(|err| panic!("unexpected error {:?}", err)),
);
}).map_err(|err| panic!("unexpected error {:?}", err))
.tag(op_name)
.box_op();

debug!(
"msg_from_js {} sync {}",
Expand All @@ -143,73 +145,94 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}

pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
msg::Any::WorkerGetMessage => Some(op_worker_get_message),
msg::Any::WorkerPostMessage => Some(op_worker_post_message),
msg::Any::Exit => Some(op_exit),
msg::Any::Start => Some(op_start),
_ => None,
}
pub fn op_selector_compiler(
inner_type: msg::Any,
) -> Option<(&'static str, OpCreator)> {
Some(match inner_type {
msg::Any::FetchModuleMetaData => {
("op_fetch_module_meta_data", op_fetch_module_meta_data)
}
msg::Any::WorkerGetMessage => {
("op_worker_get_message", op_worker_get_message)
}
msg::Any::WorkerPostMessage => {
("op_worker_post_message", op_worker_post_message)
}
msg::Any::Exit => ("op_exit", op_exit),
msg::Any::Start => ("op_exit", op_start),
_ => return None,
})
}

/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
msg::Any::Chdir => Some(op_chdir),
msg::Any::Chmod => Some(op_chmod),
msg::Any::Close => Some(op_close),
msg::Any::CopyFile => Some(op_copy_file),
msg::Any::Cwd => Some(op_cwd),
msg::Any::Dial => Some(op_dial),
msg::Any::Environ => Some(op_env),
msg::Any::Exit => Some(op_exit),
msg::Any::Fetch => Some(op_fetch),
msg::Any::FormatError => Some(op_format_error),
msg::Any::GlobalTimer => Some(op_global_timer),
msg::Any::GlobalTimerStop => Some(op_global_timer_stop),
msg::Any::IsTTY => Some(op_is_tty),
msg::Any::Link => Some(op_link),
msg::Any::Listen => Some(op_listen),
msg::Any::MakeTempDir => Some(op_make_temp_dir),
msg::Any::Metrics => Some(op_metrics),
msg::Any::Mkdir => Some(op_mkdir),
msg::Any::Now => Some(op_now),
msg::Any::Open => Some(op_open),
msg::Any::PermissionRevoke => Some(op_revoke_permission),
msg::Any::Permissions => Some(op_permissions),
msg::Any::Read => Some(op_read),
msg::Any::ReadDir => Some(op_read_dir),
msg::Any::Readlink => Some(op_read_link),
msg::Any::Remove => Some(op_remove),
msg::Any::Rename => Some(op_rename),
msg::Any::ReplReadline => Some(op_repl_readline),
msg::Any::ReplStart => Some(op_repl_start),
msg::Any::Resources => Some(op_resources),
msg::Any::Run => Some(op_run),
msg::Any::RunStatus => Some(op_run_status),
msg::Any::Seek => Some(op_seek),
msg::Any::SetEnv => Some(op_set_env),
msg::Any::Shutdown => Some(op_shutdown),
msg::Any::Start => Some(op_start),
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
pub fn op_selector_std(
inner_type: msg::Any,
) -> Option<(&'static str, OpCreator)> {
Some(match inner_type {
msg::Any::Accept => ("op_accept", op_accept),
msg::Any::Chdir => ("op_chdir", op_chdir),
msg::Any::Chmod => ("op_chmod", op_chmod),
msg::Any::Close => ("op_close", op_close),
msg::Any::CopyFile => ("op_copy_file", op_copy_file),
msg::Any::Cwd => ("op_cwd", op_cwd),
msg::Any::Dial => ("op_dial", op_dial),
msg::Any::Environ => ("op_env", op_env),
msg::Any::Exit => ("op_exit", op_exit),
msg::Any::Fetch => ("op_fetch", op_fetch),
msg::Any::FormatError => ("op_format_error", op_format_error),
msg::Any::GlobalTimer => ("op_global_timer", op_global_timer),
msg::Any::GlobalTimerStop => ("op_global_timer_stop", op_global_timer_stop),
msg::Any::IsTTY => ("op_is_tty", op_is_tty),
msg::Any::Link => ("op_link", op_link),
msg::Any::Listen => ("op_listen", op_listen),
msg::Any::MakeTempDir => ("op_make_temp_dir", op_make_temp_dir),
msg::Any::Metrics => ("op_metrics", op_metrics),
msg::Any::Mkdir => ("op_mkdir", op_mkdir),
msg::Any::Now => ("op_now", op_now),
msg::Any::Open => ("op_open", op_open),
msg::Any::PermissionRevoke => {
("op_revoke_permission", op_revoke_permission)
}
msg::Any::Permissions => ("op_permissions", op_permissions),
msg::Any::Read => ("op_read", op_read),
msg::Any::ReadDir => ("op_read_dir", op_read_dir),
msg::Any::Readlink => ("op_read_link", op_read_link),
msg::Any::Remove => ("op_remove", op_remove),
msg::Any::Rename => ("op_rename", op_rename),
msg::Any::ReplReadline => ("op_repl_readline", op_repl_readline),
msg::Any::ReplStart => ("op_repl_start", op_repl_start),
msg::Any::Resources => ("op_resources", op_resources),
msg::Any::Run => ("op_run", op_run),
msg::Any::RunStatus => ("op_run_status", op_run_status),
msg::Any::Seek => ("op_seek", op_seek),
msg::Any::SetEnv => ("op_set_env", op_set_env),
msg::Any::Shutdown => ("op_shutdown", op_shutdown),
msg::Any::Start => ("op_start", op_start),
msg::Any::Stat => ("op_stat", op_stat),
msg::Any::Symlink => ("op_symlink", op_symlink),
msg::Any::Truncate => ("op_truncate", op_truncate),
msg::Any::CreateWorker => ("op_create_worker", op_create_worker),
msg::Any::HostGetWorkerClosed => {
("op_host_get_worker_closed", op_host_get_worker_closed)
}
msg::Any::HostGetMessage => ("op_host_get_message", op_host_get_message),
msg::Any::HostPostMessage => ("op_host_post_message", op_host_post_message),
msg::Any::Write => ("op_write", op_write),

// TODO(ry) split these out so that only the appropriate Workers can access
// them.
msg::Any::WorkerGetMessage => Some(op_worker_get_message),
msg::Any::WorkerPostMessage => Some(op_worker_post_message),
msg::Any::FetchModuleMetaData => {
("op_fetch_module_meta_data", op_fetch_module_meta_data)
}
msg::Any::WorkerGetMessage => {
("op_worker_get_message", op_worker_get_message)
}

_ => None,
}
msg::Any::WorkerPostMessage => {
("op_worker_post_message", op_worker_post_message)
}
_ => return None,
})
}

// Returns a milliseconds and nanoseconds subsec
Expand Down
40 changes: 20 additions & 20 deletions core/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,50 +120,50 @@ impl Dispatch for HttpBench {
) -> (bool, Box<Op>) {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
let (op_name, http_bench_op) = match record.op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
("op_listen", op_listen())
}
OP_CLOSE => {
assert!(is_sync);
let rid = record.arg;
op_close(rid)
("op_close", op_close(rid))
}
OP_ACCEPT => {
assert!(!is_sync);
let listener_rid = record.arg;
op_accept(listener_rid)
("op_accept", op_accept(listener_rid))
}
OP_READ => {
assert!(!is_sync);
let rid = record.arg;
op_read(rid, zero_copy_buf)
("op_read", op_read(rid, zero_copy_buf))
}
OP_WRITE => {
assert!(!is_sync);
let rid = record.arg;
op_write(rid, zero_copy_buf)
("op_write", op_write(rid, zero_copy_buf))
}
_ => panic!("bad op {}", record.op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();

let op = Box::new(
http_bench_op
.and_then(move |result| {
record_a.result = result;
Ok(record_a)
}).or_else(|err| -> Result<Record, ()> {
eprintln!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
}).then(|result| -> Result<Buf, ()> {
let record = result.unwrap();
Ok(record.into())
}),
);
let op = http_bench_op
.and_then(move |result| {
record_a.result = result;
Ok(record_a)
}).or_else(|err| -> Result<Record, ()> {
eprintln!("unexpected err {}", err);
record_b.result = -1;
Ok(record_b)
}).then(|result| -> Result<Buf, ()> {
let record = result.unwrap();
Ok(record.into())
}).tag(op_name)
.box_op();

(is_sync, op)
}
}
Expand Down
56 changes: 50 additions & 6 deletions core/isolate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,55 @@ use futures::Poll;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::ptr::null;
use std::sync::{Arc, Mutex, Once, ONCE_INIT};

pub type Buf = Box<[u8]>;
pub type Op = dyn Future<Item = Buf, Error = ()> + Send;

pub struct Tagged<F> {
future: F,
tag: &'static str,
}
impl<F> Future for Tagged<F>
where
F: Future,
{
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<F::Item, F::Error> {
self.future.poll()
}
}
impl<F> Debug for Tagged<F> {
fn fmt(&self, f: &mut Formatter) -> FmtResult {
f.write_str(self.tag)
}
}

pub trait Tag
where
Self: Future + Sized,
{
fn tag(self, tag: &'static str) -> Tagged<Self> {
Tagged { future: self, tag }
}
}
impl<T> Tag for T where T: Future + Sized {}

pub trait Op: Future<Item = Buf, Error = ()> + Debug + Send {
fn box_op(self) -> Box<dyn Op>;
}
impl<T> Op for T
where
T: Future<Item = Buf, Error = ()> + Debug + Send + Sized + 'static,
{
fn box_op(self) -> Box<dyn Op> {
Box::new(self)
}
}

#[derive(Debug)]
struct PendingOp {
op: Box<Op>,
zero_copy_id: usize, // non-zero if associated zero-copy buffer.
Expand Down Expand Up @@ -521,6 +564,7 @@ pub fn js_check(r: Result<(), JSError>) {
pub mod tests {
use super::*;
use futures::executor::spawn;
use futures::future;
use futures::future::lazy;
use futures::future::ok;
use futures::Async;
Expand Down Expand Up @@ -603,12 +647,12 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
(false, future::ok(buf).tag("AsyncImmediate").box_op())
}
TestDispatchMode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
(true, future::ok(buf).tag("OverflowReqSync").box_op())
}
TestDispatchMode::OverflowResSync => {
assert_eq!(control.len(), 1);
Expand All @@ -617,12 +661,12 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
let buf = vec.into_boxed_slice();
(true, Box::new(futures::future::ok(buf)))
(true, future::ok(buf).tag("OverflowResSync").box_op())
}
TestDispatchMode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
(false, future::ok(buf).tag("OverflowReqAsync").box_op())
}
TestDispatchMode::OverflowResAsync => {
assert_eq!(control.len(), 1);
Expand All @@ -631,7 +675,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
(false, future::ok(buf).tag("OverflowResAsync").box_op())
}
}
}
Expand Down

0 comments on commit c9ebe45

Please sign in to comment.