diff --git a/cli/dispatch_minimal.rs b/cli/dispatch_minimal.rs index dd6a962e21ca0..322094be2d880 100644 --- a/cli/dispatch_minimal.rs +++ b/cli/dispatch_minimal.rs @@ -8,33 +8,26 @@ use crate::state::ThreadSafeState; use deno::Buf; use deno::CoreOp; use deno::Op; +use deno::OpId; use deno::PinnedBuf; use futures::Future; -const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE; -const OP_READ: i32 = 1; -const OP_WRITE: i32 = 2; +const OP_READ: OpId = 1; +const OP_WRITE: OpId = 2; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. pub struct Record { pub promise_id: i32, - pub op_id: i32, pub arg: i32, pub result: i32, } impl Into for Record { fn into(self) -> Buf { - let vec = vec![ - DISPATCH_MINIMAL_TOKEN, - self.promise_id, - self.op_id, - self.arg, - self.result, - ]; + let vec = vec![self.promise_id, self.arg, self.result]; let buf32 = vec.into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4]; + let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; unsafe { Box::from_raw(ptr) } } } @@ -48,32 +41,25 @@ pub fn parse_min_record(bytes: &[u8]) -> Option { let p32 = p as *const i32; let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; - if s.len() < 5 { + if s.len() != 3 { return None; } let ptr = s.as_ptr(); - let ints = unsafe { std::slice::from_raw_parts(ptr, 5) }; - if ints[0] != DISPATCH_MINIMAL_TOKEN { - return None; - } + let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; Some(Record { - promise_id: ints[1], - op_id: ints[2], - arg: ints[3], - result: ints[4], + promise_id: ints[0], + arg: ints[1], + result: ints[2], }) } #[test] fn test_parse_min_record() { - let buf = vec![ - 0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, - ]; + let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]; assert_eq!( parse_min_record(&buf), Some(Record { promise_id: 1, - op_id: 2, arg: 3, result: 4, }) @@ -88,11 +74,12 @@ fn test_parse_min_record() { pub fn dispatch_minimal( state: &ThreadSafeState, + op_id: OpId, mut record: Record, zero_copy: Option, ) -> CoreOp { let is_sync = record.promise_id == 0; - let min_op = match record.op_id { + let min_op = match op_id { OP_READ => ops::read(record.arg, zero_copy), OP_WRITE => ops::write(record.arg, zero_copy), _ => unimplemented!(), diff --git a/cli/ops.rs b/cli/ops.rs index 98528f9795253..d75e7bd2015c3 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -27,14 +27,7 @@ use crate::tokio_write; use crate::version; use crate::worker::Worker; use atty; -use deno::Buf; -use deno::CoreOp; -use deno::ErrBox; -use deno::Loader; -use deno::ModuleSpecifier; -use deno::Op; -use deno::OpResult; -use deno::PinnedBuf; +use deno::*; use flatbuffers::FlatBufferBuilder; use futures; use futures::Async; @@ -82,16 +75,20 @@ fn empty_buf() -> Buf { Box::new([]) } +const FLATBUFFER_OP_ID: OpId = 44; + pub fn dispatch_all( state: &ThreadSafeState, + op_id: OpId, control: &[u8], zero_copy: Option, op_selector: OpSelector, ) -> CoreOp { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); - let op = if let Some(min_record) = parse_min_record(control) { - dispatch_minimal(state, min_record, zero_copy) + let op = if op_id != FLATBUFFER_OP_ID { + let min_record = parse_min_record(control).unwrap(); + dispatch_minimal(state, op_id, min_record, zero_copy) } else { dispatch_all_legacy(state, control, zero_copy, op_selector) }; diff --git a/cli/state.rs b/cli/state.rs index f4e3d9c84164a..e1480c02722ae 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -20,6 +20,7 @@ use deno::CoreOp; use deno::ErrBox; use deno::Loader; use deno::ModuleSpecifier; +use deno::OpId; use deno::PinnedBuf; use futures::future::Shared; use futures::Future; @@ -104,10 +105,11 @@ impl Deref for ThreadSafeState { impl ThreadSafeState { pub fn dispatch( &self, + op_id: OpId, control: &[u8], zero_copy: Option, ) -> CoreOp { - ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) + ops::dispatch_all(self, op_id, control, zero_copy, self.dispatch_selector) } } diff --git a/cli/worker.rs b/cli/worker.rs index f180628369282..f707f4a58f872 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -29,8 +29,8 @@ impl Worker { { let mut i = isolate.lock().unwrap(); let state_ = state.clone(); - i.set_dispatch(move |control_buf, zero_copy_buf| { - state_.dispatch(control_buf, zero_copy_buf) + i.set_dispatch(move |op_id, control_buf, zero_copy_buf| { + state_.dispatch(op_id, control_buf, zero_copy_buf) }); let state_ = state.clone(); i.set_js_error_create(move |v8_exception| { diff --git a/core/core.d.ts b/core/core.d.ts index b1d1ac57f486b..1e9eb7c04ea1b 100644 --- a/core/core.d.ts +++ b/core/core.d.ts @@ -5,11 +5,12 @@ // Deno and therefore do not flow through to the runtime type library. declare interface MessageCallback { - (msg: Uint8Array): void; + (opId: number, msg: Uint8Array): void; } declare interface DenoCore { dispatch( + opId: number, control: Uint8Array, zeroCopy?: ArrayBufferView | null ): Uint8Array | null; diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 8eb764b55d1c1..4c68f2be64e7b 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -29,20 +29,19 @@ function createResolvable() { return Object.assign(promise, methods); } -const scratch32 = new Int32Array(4); +const scratch32 = new Int32Array(3); const scratchBytes = new Uint8Array( scratch32.buffer, scratch32.byteOffset, scratch32.byteLength ); -assert(scratchBytes.byteLength === 4 * 4); +assert(scratchBytes.byteLength === 3 * 4); function send(promiseId, opId, arg, zeroCopy = null) { scratch32[0] = promiseId; - scratch32[1] = opId; - scratch32[2] = arg; - scratch32[3] = -1; - return Deno.core.dispatch(scratchBytes, zeroCopy); + scratch32[1] = arg; + scratch32[2] = -1; + return Deno.core.dispatch(opId, scratchBytes, zeroCopy); } /** Returns Promise */ @@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) { } function recordFromBuf(buf) { - assert(buf.byteLength === 16); + assert(buf.byteLength === 3 * 4); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); return { promiseId: buf32[0], - opId: buf32[1], - arg: buf32[2], - result: buf32[3] + arg: buf32[1], + result: buf32[2] }; } @@ -72,7 +70,7 @@ function sendSync(opId, arg) { return record.result; } -function handleAsyncMsgFromRust(buf) { +function handleAsyncMsgFromRust(opId, buf) { const record = recordFromBuf(buf); const { promiseId, result } = record; const p = promiseMap.get(promiseId); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 73a4720c2cd9c..3c077562d68c4 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -36,25 +36,23 @@ impl log::Log for Logger { fn flush(&self) {} } -const OP_LISTEN: i32 = 1; -const OP_ACCEPT: i32 = 2; -const OP_READ: i32 = 3; -const OP_WRITE: i32 = 4; -const OP_CLOSE: i32 = 5; +const OP_LISTEN: OpId = 1; +const OP_ACCEPT: OpId = 2; +const OP_READ: OpId = 3; +const OP_WRITE: OpId = 4; +const OP_CLOSE: OpId = 5; #[derive(Clone, Debug, PartialEq)] pub struct Record { pub promise_id: i32, - pub op_id: i32, pub arg: i32, pub result: i32, } impl Into for Record { fn into(self) -> Buf { - let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] - .into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 16]; + let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice(); + let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; unsafe { Box::from_raw(ptr) } } } @@ -63,28 +61,26 @@ impl From<&[u8]> for Record { fn from(s: &[u8]) -> Record { #[allow(clippy::cast_ptr_alignment)] let ptr = s.as_ptr() as *const i32; - let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; + let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; Record { promise_id: ints[0], - op_id: ints[1], - arg: ints[2], - result: ints[3], + arg: ints[1], + result: ints[2], } } } impl From for Record { fn from(buf: Buf) -> Record { - assert_eq!(buf.len(), 4 * 4); + assert_eq!(buf.len(), 3 * 4); #[allow(clippy::cast_ptr_alignment)] - let ptr = Box::into_raw(buf) as *mut [i32; 4]; + let ptr = Box::into_raw(buf) as *mut [i32; 3]; let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; - assert_eq!(ints.len(), 4); + assert_eq!(ints.len(), 3); Record { promise_id: ints[0], - op_id: ints[1], - arg: ints[2], - result: ints[3], + arg: ints[1], + result: ints[2], } } } @@ -93,7 +89,6 @@ impl From for Record { fn test_record_from() { let r = Record { promise_id: 1, - op_id: 2, arg: 3, result: 4, }; @@ -102,7 +97,7 @@ fn test_record_from() { #[cfg(target_endian = "little")] assert_eq!( buf, - vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() + vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() ); let actual = Record::from(buf); assert_eq!(actual, expected); @@ -111,10 +106,14 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch(control: &[u8], zero_copy_buf: Option) -> CoreOp { +fn dispatch( + op_id: OpId, + control: &[u8], + zero_copy_buf: Option, +) -> CoreOp { let record = Record::from(control); let is_sync = record.promise_id == 0; - let http_bench_op = match record.op_id { + let http_bench_op = match op_id { OP_LISTEN => { assert!(is_sync); op_listen() @@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option) -> CoreOp { let rid = record.arg; op_write(rid, zero_copy_buf) } - _ => panic!("bad op {}", record.op_id), + _ => panic!("bad op {}", op_id), }; let mut record_a = record.clone(); let mut record_b = record.clone(); diff --git a/core/isolate.rs b/core/isolate.rs index 0f693ff92dba1..d3ac4457e2f80 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -12,6 +12,7 @@ use crate::libdeno::deno_buf; use crate::libdeno::deno_dyn_import_id; use crate::libdeno::deno_mod; use crate::libdeno::deno_pinned_buf; +use crate::libdeno::OpId; use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; @@ -33,6 +34,9 @@ pub type Buf = Box<[u8]>; pub type OpAsyncFuture = Box + Send>; +type PendingOpFuture = + Box + Send>; + pub enum Op { Sync(Buf), Async(OpAsyncFuture), @@ -40,10 +44,13 @@ pub enum Op { pub type CoreError = (); -type CoreOpAsyncFuture = OpAsyncFuture; - pub type CoreOp = Op; +pub type OpResult = Result, E>; + +/// Args: op_id, control_buf, zero_copy_buf +type CoreDispatchFn = dyn Fn(OpId, &[u8], Option) -> CoreOp; + /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -76,10 +83,6 @@ pub enum StartupData<'a> { None, } -pub type OpResult = Result, E>; - -type CoreDispatchFn = dyn Fn(&[u8], Option) -> CoreOp; - pub type DynImportFuture = Box + Send>; type DynImportFn = dyn Fn(&str, &str) -> DynImportFuture; @@ -121,7 +124,7 @@ pub struct Isolate { js_error_create: Arc, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered, + pending_ops: FuturesUnordered, pending_dyn_imports: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, @@ -198,7 +201,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch(&mut self, f: F) where - F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, + F: Fn(OpId, &[u8], Option) -> CoreOp + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -265,13 +268,14 @@ impl Isolate { extern "C" fn pre_dispatch( user_data: *mut c_void, - control_argv0: deno_buf, + op_id: OpId, + control_buf: deno_buf, zero_copy_buf: deno_pinned_buf, ) { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let op = if let Some(ref f) = isolate.dispatch { - f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) + f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.dispatch not set") }; @@ -280,13 +284,17 @@ impl Isolate { match op { Op::Sync(buf) => { // For sync messages, we always return the response via Deno.core.send's - // return value. - // TODO(ry) check that if JSError thrown during respond(), that it will be - // picked up. - let _ = isolate.respond(Some(&buf)); + // return value. Sync messages ignore the op_id. + let op_id = 0; + isolate + .respond(Some((op_id, &buf))) + // Because this is a sync op, deno_respond() does not actually call + // into JavaScript. We should not get an error here. + .expect("unexpected error"); } Op::Async(fut) => { - isolate.pending_ops.push(fut); + let fut2 = fut.map(move |buf| (op_id, buf)); + isolate.pending_ops.push(Box::new(fut2)); isolate.have_unpolled_ops = true; } } @@ -347,13 +355,16 @@ impl Isolate { } } - fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), ErrBox> { - let buf = match maybe_buf { - None => deno_buf::empty(), - Some(r) => deno_buf::from(r), + fn respond( + &mut self, + maybe_buf: Option<(OpId, &[u8])>, + ) -> Result<(), ErrBox> { + let (op_id, buf) = match maybe_buf { + None => (0, deno_buf::empty()), + Some((op_id, r)) => (op_id, deno_buf::from(r)), }; unsafe { - libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), op_id, buf) } self.check_last_exception() } @@ -541,7 +552,7 @@ impl Future for Isolate { fn poll(&mut self) -> Poll<(), ErrBox> { self.shared_init(); - let mut overflow_response: Option = None; + let mut overflow_response: Option<(OpId, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. @@ -567,13 +578,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(buf))) => { - let successful_push = self.shared.push(&buf); + Ok(Ready(Some((op_id, buf)))) => { + let successful_push = self.shared.push(op_id, &buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); + overflow_response = Some((op_id, buf)); break; } } @@ -592,8 +603,8 @@ impl Future for Isolate { if overflow_response.is_some() { // Lock the current thread for V8. let locker = LockerScope::new(self.libdeno_isolate); - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; + let (op_id, buf) = overflow_response.take().unwrap(); + self.respond(Some((op_id, &buf)))?; drop(locker); } @@ -633,10 +644,11 @@ impl IsolateHandle { } } -pub fn js_check(r: Result<(), ErrBox>) { +pub fn js_check(r: Result) -> T { if let Err(e) = r { panic!(e.to_string()); } + r.unwrap() } #[cfg(test)] @@ -689,7 +701,8 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> CoreOp { + isolate.set_dispatch(move |op_id, control, _| -> CoreOp { + println!("op_id {}", op_id); dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { @@ -749,9 +762,9 @@ pub mod tests { "filename.js", r#" let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); async function main() { - Deno.core.send(control); + Deno.core.send(42, control); } main(); "#, @@ -770,7 +783,7 @@ pub mod tests { import { b } from 'b.js' if (b() != 'b') throw Error(); let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); "#, ) .unwrap(); @@ -816,7 +829,7 @@ pub mod tests { "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { nrecv++; }); "#, @@ -827,7 +840,7 @@ pub mod tests { r#" assert(nrecv == 0); let control = new Uint8Array([42]); - Deno.core.send(control); + Deno.core.send(42, control); assert(nrecv == 0); "#, )); @@ -838,7 +851,7 @@ pub mod tests { "check2.js", r#" assert(nrecv == 1); - Deno.core.send(control); + Deno.core.send(42, control); assert(nrecv == 1); "#, )); @@ -1016,10 +1029,10 @@ pub mod tests { "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response instanceof Uint8Array); assert(response.length == 1); assert(response[0] == 43); @@ -1038,10 +1051,10 @@ pub mod tests { "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response instanceof Uint8Array); assert(response.length == 100 * 1024 * 1024); assert(response[0] == 99); @@ -1059,21 +1072,22 @@ pub mod tests { "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 99); assert(buf.byteLength === 1); assert(buf[0] === 43); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); // Async messages always have null response. assert(response == null); assert(asyncRecv == 0); "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); - assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + assert_eq!(Async::Ready(()), js_check(isolate.poll())); js_check(isolate.execute("check.js", "assert(asyncRecv == 1);")); }); } @@ -1088,14 +1102,15 @@ pub mod tests { "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 99); assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response == null); assert(asyncRecv == 0); "#, @@ -1116,19 +1131,20 @@ pub mod tests { "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId === 99); assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); - let response = Deno.core.dispatch(control); + let response = Deno.core.dispatch(99, control); assert(response == null); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows - Deno.core.dispatch(control); + Deno.core.dispatch(99, control); "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); diff --git a/core/lib.rs b/core/lib.rs index 61521aecb0002..9be1c3891789c 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -18,6 +18,7 @@ pub use crate::flags::v8_set_flags; pub use crate::isolate::*; pub use crate::js_errors::*; pub use crate::libdeno::deno_mod; +pub use crate::libdeno::OpId; pub use crate::libdeno::PinnedBuf; pub use crate::module_specifier::*; pub use crate::modules::*; diff --git a/core/libdeno.rs b/core/libdeno.rs index c402d87548c94..071f6ddf500c2 100644 --- a/core/libdeno.rs +++ b/core/libdeno.rs @@ -12,6 +12,8 @@ use std::ptr::null; use std::ptr::NonNull; use std::slice; +pub type OpId = u32; + // TODO(F001): change this definition to `extern { pub type isolate; }` // After RFC 1861 is stablized. See https://github.com/rust-lang/rust/issues/43467. #[repr(C)] @@ -188,7 +190,8 @@ impl Snapshot2<'_> { #[allow(non_camel_case_types)] type deno_recv_cb = unsafe extern "C" fn( user_data: *mut c_void, - control_buf: deno_buf, // deprecated + op_id: OpId, + control_buf: deno_buf, zero_copy_buf: deno_pinned_buf, ); @@ -266,6 +269,7 @@ extern "C" { pub fn deno_respond( i: *const isolate, user_data: *const c_void, + op_id: OpId, buf: deno_buf, ); pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf); diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 61eeb43ca4670..1e6b5dfbf9bfe 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -159,10 +159,11 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) { auto _ = deno::PinnedBuf(buf); } -void deno_respond(Deno* d_, void* user_data, deno_buf buf) { +void deno_respond(Deno* d_, void* user_data, deno_op_id op_id, deno_buf buf) { auto* d = unwrap(d_); if (d->current_args_ != nullptr) { // Synchronous response. + // Note op_id is not passed back in the case of synchronous response. if (buf.data_ptr != nullptr) { auto ab = deno::ImportBuf(d, buf); d->current_args_->GetReturnValue().Set(ab); @@ -187,12 +188,13 @@ void deno_respond(Deno* d_, void* user_data, deno_buf buf) { return; } - v8::Local args[1]; + v8::Local args[2]; int argc = 0; if (buf.data_ptr != nullptr) { - args[0] = deno::ImportBuf(d, buf); - argc = 1; + args[0] = v8::Integer::New(d->isolate_, op_id); + args[1] = deno::ImportBuf(d, buf); + argc = 2; } auto v = recv_->Call(context, context->Global(), argc, args); diff --git a/core/libdeno/binding.cc b/core/libdeno/binding.cc index da582a3bfa9d1..291e62f01caa5 100644 --- a/core/libdeno/binding.cc +++ b/core/libdeno/binding.cc @@ -223,22 +223,29 @@ void Send(const v8::FunctionCallbackInfo& args) { v8::HandleScope handle_scope(isolate); deno_buf control = {nullptr, 0}; - if (args[0]->IsArrayBufferView()) { - auto view = v8::Local::Cast(args[0]); + + int32_t op_id = 0; + if (args[0]->IsInt32()) { + auto context = d->context_.Get(isolate); + op_id = args[0]->Int32Value(context).FromJust(); + } + + if (args[1]->IsArrayBufferView()) { + auto view = v8::Local::Cast(args[1]); auto data = reinterpret_cast(view->Buffer()->GetContents().Data()); control = {data + view->ByteOffset(), view->ByteLength()}; } PinnedBuf zero_copy = - args[1]->IsArrayBufferView() - ? PinnedBuf(v8::Local::Cast(args[1])) + args[2]->IsArrayBufferView() + ? PinnedBuf(v8::Local::Cast(args[2])) : PinnedBuf(); DCHECK_NULL(d->current_args_); d->current_args_ = &args; - d->recv_cb_(d->user_data_, control, zero_copy.IntoRaw()); + d->recv_cb_(d->user_data_, op_id, control, zero_copy.IntoRaw()); if (d->current_args_ == nullptr) { // This indicates that deno_repond() was called already. diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h index fe52148486f45..2c248a87e142e 100644 --- a/core/libdeno/deno.h +++ b/core/libdeno/deno.h @@ -28,10 +28,22 @@ typedef struct { typedef struct deno_s Deno; -// A callback to receive a message from a libdeno.send() javascript call. +typedef uint32_t deno_op_id; + +// A callback to receive a message from a Deno.core.send() javascript call. // control_buf is valid for only for the lifetime of this callback. // data_buf is valid until deno_respond() is called. -typedef void (*deno_recv_cb)(void* user_data, deno_buf control_buf, +// +// op_id corresponds to the first argument of Deno.core.send(). +// op_id is an extra user-defined integer valued which is not interpreted by +// libdeno. +// +// control_buf corresponds to the second argument of Deno.core.send(). +// +// zero_copy_buf corresponds to the third argument of Deno.core.send(). +// The user must call deno_pinned_buf_delete on each zero_copy_buf received. +typedef void (*deno_recv_cb)(void* user_data, deno_op_id op_id, + deno_buf control_buf, deno_pinned_buf zero_copy_buf); typedef int deno_dyn_import_id; @@ -49,7 +61,7 @@ typedef struct { int will_snapshot; // Default 0. If calling deno_snapshot_new 1. deno_snapshot load_snapshot; // A startup snapshot to use. deno_buf shared; // Shared buffer to be mapped to libdeno.shared - deno_recv_cb recv_cb; // Maps to libdeno.send() calls. + deno_recv_cb recv_cb; // Maps to Deno.core.send() calls. deno_dyn_import_cb dyn_import_cb; } deno_config; @@ -78,21 +90,25 @@ void deno_unlock(Deno* d); void deno_execute(Deno* d, void* user_data, const char* js_filename, const char* js_source); -// deno_respond sends up to one message back for every deno_recv_cb made. +// deno_respond sends one message back for every deno_recv_cb made. // -// If this is called during deno_recv_cb, the issuing libdeno.send() in +// If this is called during deno_recv_cb, the issuing Deno.core.send() in // javascript will synchronously return the specified buf as an ArrayBuffer (or // null if buf is empty). // // If this is called after deno_recv_cb has returned, the deno_respond -// will call into the JS callback specified by libdeno.recv(). +// will call into the JS callback specified by Deno.core.recv(). // // (Ideally, but not currently: After calling deno_respond(), the caller no // longer owns `buf` and must not use it; deno_respond() is responsible for // releasing its memory.) // +// op_id is an extra user-defined integer valued which is not currently +// interpreted by libdeno. But it should probably correspond to the op_id in +// deno_recv_cb. +// // If a JS exception was encountered, deno_last_exception() will be non-NULL. -void deno_respond(Deno* d, void* user_data, deno_buf buf); +void deno_respond(Deno* d, void* user_data, deno_op_id op_id, deno_buf buf); // consumes zero_copy void deno_pinned_buf_delete(deno_pinned_buf* buf); diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts index 1bc7367d95916..8a26e49ca3265 100644 --- a/core/libdeno/libdeno.d.ts +++ b/core/libdeno/libdeno.d.ts @@ -13,13 +13,14 @@ interface EvalErrorInfo { } declare interface MessageCallback { - (msg: Uint8Array): void; + (opId: number, msg: Uint8Array): void; } declare interface DenoCore { recv(cb: MessageCallback): void; send( + opId: number, control: null | ArrayBufferView, data?: ArrayBufferView ): null | Uint8Array; diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc index 16a4a11f61a7c..a72793944b0c3 100644 --- a/core/libdeno/libdeno_test.cc +++ b/core/libdeno/libdeno_test.cc @@ -49,7 +49,8 @@ void assert_null(deno_pinned_buf b) { TEST(LibDenoTest, RecvReturnEmpty) { static int count = 0; - auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { + auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) { + EXPECT_EQ(op_id, 42u); assert_null(zero_copy_buf); count++; EXPECT_EQ(static_cast(3), buf.data_len); @@ -64,9 +65,43 @@ TEST(LibDenoTest, RecvReturnEmpty) { deno_delete(d); } +TEST(LibDenoTest, BasicRecv) { + static int count = 0; + auto recv_cb = [](auto user_data, deno_op_id op_id, auto buf, + auto zero_copy_buf) { + EXPECT_EQ(op_id, 42u); + // auto d = reinterpret_cast(user_data); + assert_null(zero_copy_buf); + count++; + EXPECT_EQ(static_cast(3), buf.data_len); + EXPECT_EQ(buf.data_ptr[0], 1); + EXPECT_EQ(buf.data_ptr[1], 2); + EXPECT_EQ(buf.data_ptr[2], 3); + }; + Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr}); + deno_execute(d, d, "a.js", "BasicRecv()"); + EXPECT_EQ(nullptr, deno_last_exception(d)); + EXPECT_EQ(count, 1); + deno_check_promise_errors(d); + EXPECT_EQ(deno_last_exception(d), nullptr); + { + deno_lock(d); + uint8_t response[] = {'b', 'a', 'r'}; + deno_respond(d, nullptr, 43, {response, sizeof response}); + deno_unlock(d); + } + EXPECT_EQ(count, 2); + EXPECT_EQ(nullptr, deno_last_exception(d)); + deno_check_promise_errors(d); + EXPECT_EQ(deno_last_exception(d), nullptr); + deno_delete(d); +} + TEST(LibDenoTest, RecvReturnBar) { static int count = 0; - auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) { + auto recv_cb = [](auto user_data, deno_op_id op_id, auto buf, + auto zero_copy_buf) { + EXPECT_EQ(op_id, 42u); auto d = reinterpret_cast(user_data); assert_null(zero_copy_buf); count++; @@ -75,7 +110,7 @@ TEST(LibDenoTest, RecvReturnBar) { EXPECT_EQ(buf.data_ptr[1], 'b'); EXPECT_EQ(buf.data_ptr[2], 'c'); uint8_t response[] = {'b', 'a', 'r'}; - deno_respond(d, user_data, {response, sizeof response}); + deno_respond(d, user_data, op_id, {response, sizeof response}); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr}); deno_execute(d, d, "a.js", "RecvReturnBar()"); @@ -126,8 +161,9 @@ TEST(LibDenoTest, GlobalErrorHandling) { TEST(LibDenoTest, ZeroCopyBuf) { static int count = 0; static deno_pinned_buf zero_copy_buf2; - auto recv_cb = [](auto user_data, deno_buf buf, + auto recv_cb = [](auto user_data, deno_op_id op_id, deno_buf buf, deno_pinned_buf zero_copy_buf) { + EXPECT_EQ(op_id, 42u); count++; EXPECT_NE(zero_copy_buf.pin, nullptr); zero_copy_buf.data_ptr[0] = 4; @@ -155,7 +191,9 @@ TEST(LibDenoTest, ZeroCopyBuf) { TEST(LibDenoTest, CheckPromiseErrors) { static int count = 0; - auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { count++; }; + auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) { + count++; + }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr}); EXPECT_EQ(deno_last_exception(d), nullptr); deno_execute(d, nullptr, "a.js", "CheckPromiseErrors()"); @@ -264,7 +302,8 @@ TEST(LibDenoTest, SharedAtomics) { TEST(LibDenoTest, WasmInstantiate) { static int count = 0; - auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { + auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) { + EXPECT_EQ(op_id, 42u); EXPECT_EQ(buf.data_len, 1u); EXPECT_EQ(buf.data_ptr[0], 42); count++; diff --git a/core/libdeno/libdeno_test.js b/core/libdeno/libdeno_test.js index 006c71666a570..1c765539178fb 100644 --- a/core/libdeno/libdeno_test.js +++ b/core/libdeno/libdeno_test.js @@ -28,15 +28,30 @@ global.TypedArraySnapshots = () => { global.RecvReturnEmpty = () => { const m1 = new Uint8Array("abc".split("").map(c => c.charCodeAt(0))); const m2 = m1.slice(); - const r1 = Deno.core.send(m1); + const r1 = Deno.core.send(42, m1); assert(r1 == null); - const r2 = Deno.core.send(m2); + const r2 = Deno.core.send(42, m2); assert(r2 == null); }; +global.BasicRecv = () => { + const m = new Uint8Array([1, 2, 3]); + Deno.core.recv((opId, buf) => { + assert(opId === 43); + assert(buf instanceof Uint8Array); + assert(buf.byteLength === 3); + const s = String.fromCharCode(...buf); + assert(s === "bar"); + const r = Deno.core.send(42, m); + assert(!r); // async + }); + const r = Deno.core.send(42, m); + assert(!r); // async +}; + global.RecvReturnBar = () => { const m = new Uint8Array("abc".split("").map(c => c.charCodeAt(0))); - const r = Deno.core.send(m); + const r = Deno.core.send(42, m); assert(r instanceof Uint8Array); assert(r.byteLength === 3); const rstr = String.fromCharCode(...r); @@ -58,7 +73,7 @@ global.SendRecvSlice = () => { buf[0] = 100 + i; buf[buf.length - 1] = 100 - i; // On the native side, the slice is shortened by 19 bytes. - buf = Deno.core.send(buf); + buf = Deno.core.send(42, buf); assert(buf.byteOffset === i * 11); assert(buf.byteLength === abLen - i * 30 - 19); assert(buf.buffer.byteLength == abLen); @@ -78,17 +93,17 @@ global.JSSendArrayBufferViewTypes = () => { const ab1 = new ArrayBuffer(4321); const u8 = new Uint8Array(ab1, 2468, 1000); u8[0] = 1; - Deno.core.send(u8); + Deno.core.send(42, u8); // Send Uint32Array. const ab2 = new ArrayBuffer(4321); const u32 = new Uint32Array(ab2, 2468, 1000 / Uint32Array.BYTES_PER_ELEMENT); u32[0] = 0x02020202; - Deno.core.send(u32); + Deno.core.send(42, u32); // Send DataView. const ab3 = new ArrayBuffer(4321); const dv = new DataView(ab3, 2468, 1000); dv.setUint8(0, 3); - Deno.core.send(dv); + Deno.core.send(42, dv); }; // The following join has caused SnapshotBug to segfault when using kKeep. @@ -110,7 +125,7 @@ global.ZeroCopyBuf = () => { const b = zeroCopyBuf; // The second parameter of send should modified by the // privileged side. - const r = Deno.core.send(a, b); + const r = Deno.core.send(42, a, b); assert(r == null); // b is different. assert(b[0] === 4); @@ -129,7 +144,7 @@ global.CheckPromiseErrors = () => { try { await fn(); } catch (e) { - Deno.core.send(new Uint8Array([42])); + Deno.core.send(42, new Uint8Array([42])); } })(); }; @@ -239,17 +254,17 @@ global.WasmInstantiate = () => { ]); (async () => { - Deno.core.send(new Uint8Array([42])); + Deno.core.send(42, new Uint8Array([42])); const wasm = await WebAssembly.instantiate(bytes); - Deno.core.send(new Uint8Array([42])); + Deno.core.send(42, new Uint8Array([42])); const result = wasm.instance.exports.add(1, 3); if (result != 4) { throw Error("bad"); } // To signal success, we send back a fixed buffer. - Deno.core.send(new Uint8Array([42])); + Deno.core.send(42, new Uint8Array([42])); })(); }; diff --git a/core/libdeno/modules_test.cc b/core/libdeno/modules_test.cc index 987e88791f894..e11231528acb3 100644 --- a/core/libdeno/modules_test.cc +++ b/core/libdeno/modules_test.cc @@ -2,9 +2,11 @@ #include "test.h" static int exec_count = 0; -void recv_cb(void* user_data, deno_buf buf, deno_pinned_buf zero_copy_buf) { +void recv_cb(void* user_data, deno_op_id op_id, deno_buf buf, + deno_pinned_buf zero_copy_buf) { // We use this to check that scripts have executed. EXPECT_EQ(1u, buf.data_len); + EXPECT_EQ(42u, op_id); EXPECT_EQ(buf.data_ptr[0], 4); EXPECT_EQ(zero_copy_buf.data_ptr, nullptr); EXPECT_EQ(zero_copy_buf.data_len, 0u); @@ -20,7 +22,7 @@ TEST(ModulesTest, Resolution) { static deno_mod a = deno_mod_new(d, true, "a.js", "import { b } from 'b.js'\n" "if (b() != 'b') throw Error();\n" - "Deno.core.send(new Uint8Array([4]));"); + "Deno.core.send(42, new Uint8Array([4]));"); EXPECT_NE(a, 0); EXPECT_EQ(nullptr, deno_last_exception(d)); @@ -72,7 +74,7 @@ TEST(ModulesTest, ResolutionError) { static deno_mod a = deno_mod_new(d, true, "a.js", "import 'bad'\n" - "Deno.core.send(new Uint8Array([4]));"); + "Deno.core.send(42, new Uint8Array([4]));"); EXPECT_NE(a, 0); EXPECT_EQ(nullptr, deno_last_exception(d)); @@ -106,7 +108,7 @@ TEST(ModulesTest, ImportMetaUrl) { static deno_mod a = deno_mod_new(d, true, "a.js", "if ('a.js' != import.meta.url) throw 'hmm'\n" - "Deno.core.send(new Uint8Array([4]));"); + "Deno.core.send(42, new Uint8Array([4]));"); EXPECT_NE(a, 0); EXPECT_EQ(nullptr, deno_last_exception(d)); @@ -165,7 +167,7 @@ TEST(ModulesTest, DynamicImportSuccess) { " let mod = await import('foo'); \n" " assert(mod.b() === 'b'); \n" // Send a message to signify that we're done. - " Deno.core.send(new Uint8Array([4])); \n" + " Deno.core.send(42, new Uint8Array([4])); \n" "})(); \n"; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb}); static deno_mod a = deno_mod_new(d, true, "a.js", src); @@ -206,7 +208,7 @@ TEST(ModulesTest, DynamicImportError) { "(async () => { \n" " let mod = await import('foo'); \n" // The following should be unreachable. - " Deno.core.send(new Uint8Array([4])); \n" + " Deno.core.send(42, new Uint8Array([4])); \n" "})(); \n"; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb}); static deno_mod a = deno_mod_new(d, true, "a.js", src); @@ -249,7 +251,7 @@ TEST(ModulesTest, DynamicImportAsync) { " mod = await import('foo'); \n" " assert(mod.b() === 'b'); \n" // Send a message to signify that we're done. - " Deno.core.send(new Uint8Array([4])); \n" + " Deno.core.send(42, new Uint8Array([4])); \n" "})(); \n"; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb}); static deno_mod a = deno_mod_new(d, true, "a.js", src); @@ -327,7 +329,7 @@ TEST(ModulesTest, DynamicImportThrows) { "(async () => { \n" " let mod = await import('b.js'); \n" // unreachable - " Deno.core.send(new Uint8Array([4])); \n" + " Deno.core.send(42, new Uint8Array([4])); \n" "})(); \n"; static deno_mod a = deno_mod_new(d, true, "a.js", a_src); EXPECT_NE(a, 0); @@ -401,7 +403,7 @@ TEST(ModulesTest, DynamicImportSyntaxError) { "(async () => { \n" " let mod = await import('b.js'); \n" // unreachable - " Deno.core.send(new Uint8Array([4])); \n" + " Deno.core.send(42, new Uint8Array([4])); \n" "})(); \n"; static deno_mod a = deno_mod_new(d, true, "a.js", src); EXPECT_NE(a, 0); diff --git a/core/shared_queue.js b/core/shared_queue.js index 1b338b052715b..b69f1b422e464 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -26,7 +26,7 @@ SharedQueue Binary Layout const INDEX_NUM_SHIFTED_OFF = 1; const INDEX_HEAD = 2; const INDEX_OFFSETS = 3; - const INDEX_RECORDS = 3 + MAX_RECORDS; + const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS; const HEAD_INIT = 4 * INDEX_RECORDS; // Available on start due to bindings. @@ -84,13 +84,17 @@ SharedQueue Binary Layout return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; } - function setEnd(index, end) { - shared32[INDEX_OFFSETS + index] = end; + // TODO(ry) rename to setMeta + function setMeta(index, end, opId) { + shared32[INDEX_OFFSETS + 2 * index] = end; + shared32[INDEX_OFFSETS + 2 * index + 1] = opId; } - function getEnd(index) { + function getMeta(index) { if (index < numRecords()) { - return shared32[INDEX_OFFSETS + index]; + const buf = shared32[INDEX_OFFSETS + 2 * index]; + const opId = shared32[INDEX_OFFSETS + 2 * index + 1]; + return [opId, buf]; } else { return null; } @@ -101,14 +105,14 @@ SharedQueue Binary Layout if (index == 0) { return HEAD_INIT; } else { - return shared32[INDEX_OFFSETS + index - 1]; + return shared32[INDEX_OFFSETS + 2 * (index - 1)]; } } else { return null; } } - function push(buf) { + function push(opId, buf) { let off = head(); let end = off + buf.byteLength; let index = numRecords(); @@ -116,7 +120,7 @@ SharedQueue Binary Layout // console.log("shared_queue.js push fail"); return false; } - setEnd(index, end); + setMeta(index, end, opId); assert(end - off == buf.byteLength); sharedBytes.set(buf, off); shared32[INDEX_NUM_RECORDS] += 1; @@ -132,8 +136,8 @@ SharedQueue Binary Layout return null; } - let off = getOffset(i); - let end = getEnd(i); + const off = getOffset(i); + const [opId, end] = getMeta(i); if (size() > 1) { shared32[INDEX_NUM_SHIFTED_OFF] += 1; @@ -143,7 +147,8 @@ SharedQueue Binary Layout assert(off != null); assert(end != null); - return sharedBytes.subarray(off, end); + const buf = sharedBytes.subarray(off, end); + return [opId, buf]; } let asyncHandler; @@ -153,19 +158,24 @@ SharedQueue Binary Layout asyncHandler = cb; } - function handleAsyncMsgFromRust(buf) { + function handleAsyncMsgFromRust(opId, buf) { if (buf) { - asyncHandler(buf); + // This is the overflow_response case of deno::Isolate::poll(). + asyncHandler(opId, buf); } else { - while ((buf = shift()) != null) { - asyncHandler(buf); + while (true) { + let opIdBuf = shift(); + if (opIdBuf == null) { + break; + } + asyncHandler(...opIdBuf); } } } - function dispatch(control, zeroCopy = null) { + function dispatch(opId, control, zeroCopy = null) { maybeInit(); - return Deno.core.send(control, zeroCopy); + return Deno.core.send(opId, control, zeroCopy); } const denoCore = { diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 616272f8d90e3..11c8e21271c06 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -17,6 +17,7 @@ SharedQueue Binary Layout */ use crate::libdeno::deno_buf; +use crate::libdeno::OpId; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -27,7 +28,7 @@ const INDEX_NUM_SHIFTED_OFF: usize = 1; /// It grows monotonically. const INDEX_HEAD: usize = 2; const INDEX_OFFSETS: usize = 3; -const INDEX_RECORDS: usize = 3 + MAX_RECORDS; +const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS; /// Byte offset of where the records begin. Also where the head starts. const HEAD_INIT: usize = 4 * INDEX_RECORDS; /// A rough guess at how big we should make the shared buffer in bytes. @@ -98,16 +99,19 @@ impl SharedQueue { s[INDEX_NUM_SHIFTED_OFF] as usize } - fn set_end(&mut self, index: usize, end: usize) { + fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) { let s = self.as_u32_slice_mut(); - s[INDEX_OFFSETS + index] = end as u32; + s[INDEX_OFFSETS + 2 * index] = end as u32; + s[INDEX_OFFSETS + 2 * index + 1] = op_id; } #[cfg(test)] - fn get_end(&self, index: usize) -> Option { + fn get_meta(&self, index: usize) -> Option<(OpId, usize)> { if index < self.num_records() { let s = self.as_u32_slice(); - Some(s[INDEX_OFFSETS + index] as usize) + let end = s[INDEX_OFFSETS + 2 * index] as usize; + let op_id = s[INDEX_OFFSETS + 2 * index + 1]; + Some((op_id, end)) } else { None } @@ -120,7 +124,7 @@ impl SharedQueue { HEAD_INIT } else { let s = self.as_u32_slice(); - s[INDEX_OFFSETS + index - 1] as usize + s[INDEX_OFFSETS + 2 * (index - 1)] as usize }) } else { None @@ -129,7 +133,7 @@ impl SharedQueue { /// Returns none if empty. #[cfg(test)] - pub fn shift(&mut self) -> Option<&[u8]> { + pub fn shift(&mut self) -> Option<(OpId, &[u8])> { let u32_slice = self.as_u32_slice(); let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize; if self.size() == 0 { @@ -138,7 +142,7 @@ impl SharedQueue { } let off = self.get_offset(i).unwrap(); - let end = self.get_end(i).unwrap(); + let (op_id, end) = self.get_meta(i).unwrap(); if self.size() > 1 { let u32_slice = self.as_u32_slice_mut(); @@ -146,16 +150,16 @@ impl SharedQueue { } else { self.reset(); } - debug!( + println!( "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}", self.num_records(), self.num_shifted_off(), self.head() ); - Some(&self.bytes[off..end]) + Some((op_id, &self.bytes[off..end])) } - pub fn push(&mut self, record: &[u8]) -> bool { + pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool { let off = self.head(); let end = off + record.len(); let index = self.num_records(); @@ -163,7 +167,7 @@ impl SharedQueue { debug!("WARNING the sharedQueue overflowed"); return false; } - self.set_end(index, end); + self.set_meta(index, end, op_id); assert_eq!(end - off, record.len()); self.bytes[off..end].copy_from_slice(record); let u32_slice = self.as_u32_slice_mut(); @@ -193,28 +197,28 @@ mod tests { let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); let len = r.len() + h; - assert!(q.push(&r)); + assert!(q.push(0, &r)); assert_eq!(q.head(), len); let r = vec![6, 7].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(0, &r)); let r = vec![8, 9, 10, 11].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(0, &r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![6, 7].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![8, 9, 10, 11].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -235,19 +239,21 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1))); + assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1))); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(2))); + assert!(!q.push(0, &alloc_buf(2))); assert_eq!(q.size(), 1); - assert!(q.push(&alloc_buf(1))); + assert!(q.push(0, &alloc_buf(1))); assert_eq!(q.size(), 2); - assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); + let (_op_id, buf) = q.shift().unwrap(); + assert_eq!(buf.len(), RECOMMENDED_SIZE - 1); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(1))); + assert!(!q.push(0, &alloc_buf(1))); - assert_eq!(q.shift().unwrap().len(), 1); + let (_op_id, buf) = q.shift().unwrap(); + assert_eq!(buf.len(), 1); assert_eq!(q.size(), 0); } @@ -255,11 +261,11 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(&alloc_buf(1))) + assert!(q.push(0, &alloc_buf(1))) } - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.push(0, &alloc_buf(1)), false); // Even if we shift one off, we still cannot push a new record. - assert_eq!(q.shift().unwrap().len(), 1); - assert_eq!(q.push(&alloc_buf(1)), false); + let _ignored = q.shift().unwrap(); + assert_eq!(q.push(0, &alloc_buf(1)), false); } } diff --git a/core/shared_queue_test.js b/core/shared_queue_test.js index e2597f3bc31ef..682d41d1e299c 100644 --- a/core/shared_queue_test.js +++ b/core/shared_queue_test.js @@ -11,14 +11,15 @@ function fullRecords(q) { q.reset(); const oneByte = new Uint8Array([42]); for (let i = 0; i < q.MAX_RECORDS; i++) { - assert(q.push(oneByte)); + assert(q.push(99, oneByte)); } - assert(!q.push(oneByte)); - r = q.shift(); + assert(!q.push(99, oneByte)); + const [opId, r] = q.shift(); + assert(opId == 99); assert(r.byteLength == 1); assert(r[0] == 42); // Even if we shift one off, we still cannot push a new record. - assert(!q.push(oneByte)); + assert(!q.push(99, oneByte)); } function main() { @@ -29,18 +30,19 @@ function main() { let r = new Uint8Array([1, 2, 3, 4, 5]); let len = r.byteLength + h; - assert(q.push(r)); + assert(q.push(99, r)); assert(q.head() == len); r = new Uint8Array([6, 7]); - assert(q.push(r)); + assert(q.push(99, r)); r = new Uint8Array([8, 9, 10, 11]); - assert(q.push(r)); + assert(q.push(99, r)); assert(q.numRecords() == 3); assert(q.size() == 3); - r = q.shift(); + let opId; + [opId, r] = q.shift(); assert(r.byteLength == 5); assert(r[0] == 1); assert(r[1] == 2); @@ -50,14 +52,15 @@ function main() { assert(q.numRecords() == 3); assert(q.size() == 2); - r = q.shift(); + [opId, r] = q.shift(); assert(r.byteLength == 2); assert(r[0] == 6); assert(r[1] == 7); assert(q.numRecords() == 3); assert(q.size() == 1); - r = q.shift(); + [opId, r] = q.shift(); + assert(opId == 99); assert(r.byteLength == 4); assert(r[0] == 8); assert(r[1] == 9); diff --git a/js/dispatch.ts b/js/dispatch.ts index cd11c93f6c6ca..babea5739d3b5 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -10,6 +10,11 @@ import { handleAsyncMsgFromRustMinimal } from "./dispatch_minimal"; +// TODO(ry) Currently we only use three values for opId: OP_READ, OP_WRITE, +// FLATBUFFER_OP_ID. Later on use opId for actual individual ops, not just +// classes of ops. +const FLATBUFFER_OP_ID = 44; + const promiseTable = new Map>(); interface FlatbufferRecord { @@ -26,11 +31,11 @@ function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord { }; } -export function handleAsyncMsgFromRust(ui8: Uint8Array): void { +export function handleAsyncMsgFromRust(opId: number, ui8: Uint8Array): void { const buf32 = new Int32Array(ui8.buffer, ui8.byteOffset, ui8.byteLength / 4); - const recordMin = recordFromBufMinimal(buf32); - if (recordMin) { + if (opId !== FLATBUFFER_OP_ID) { // Fast and new + const recordMin = recordFromBufMinimal(opId, buf32); handleAsyncMsgFromRustMinimal(ui8, recordMin); } else { // Legacy @@ -83,6 +88,7 @@ function sendInternal( const control = builder.asUint8Array(); const response = core.dispatch( + FLATBUFFER_OP_ID, // TODO(ry) Use actual opId later. control, zeroCopy ? ui8FromArrayBufferView(zeroCopy) : undefined ); diff --git a/js/dispatch_minimal.ts b/js/dispatch_minimal.ts index 17d3281107a02..df0a290b22f80 100644 --- a/js/dispatch_minimal.ts +++ b/js/dispatch_minimal.ts @@ -3,7 +3,6 @@ import * as util from "./util"; import { core } from "./core"; -const DISPATCH_MINIMAL_TOKEN = 0xcafe; const promiseTableMin = new Map>(); let _nextPromiseId = 0; @@ -13,31 +12,27 @@ export function nextPromiseId(): number { export interface RecordMinimal { promiseId: number; - opId: number; + opId: number; // Maybe better called dispatchId arg: number; result: number; } -/** Determines if a message has the "minimal" serialization format. If false, it - * is flatbuffer encoded. - */ -export function hasMinimalToken(i32: Int32Array): boolean { - return i32[0] == DISPATCH_MINIMAL_TOKEN; -} - -export function recordFromBufMinimal(buf32: Int32Array): null | RecordMinimal { - if (hasMinimalToken(buf32)) { - return { - promiseId: buf32[1], - opId: buf32[2], - arg: buf32[3], - result: buf32[4] - }; +export function recordFromBufMinimal( + opId: number, + buf32: Int32Array +): RecordMinimal { + if (buf32.length != 3) { + throw Error("Bad message"); } - return null; + return { + promiseId: buf32[0], + opId, + arg: buf32[1], + result: buf32[2] + }; } -const scratch32 = new Int32Array(5); +const scratch32 = new Int32Array(3); const scratchBytes = new Uint8Array( scratch32.buffer, scratch32.byteOffset, @@ -63,15 +58,11 @@ export function sendAsyncMinimal( zeroCopy: Uint8Array ): Promise { const promiseId = nextPromiseId(); // AKA cmdId - - scratch32[0] = DISPATCH_MINIMAL_TOKEN; - scratch32[1] = promiseId; - scratch32[2] = opId; - scratch32[3] = arg; - + scratch32[0] = promiseId; + scratch32[1] = arg; + scratch32[2] = 0; // result const promise = util.createResolvable(); promiseTableMin.set(promiseId, promise); - - core.dispatch(scratchBytes, zeroCopy); + core.dispatch(opId, scratchBytes, zeroCopy); return promise; }