Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add op_id throughout op API (denoland#2734)
Removes the magic number hack to switch between flatbuffers and the
minimal dispatcher.

Adds machinery to pass the op_id through the shared_queue.
  • Loading branch information
ry committed Aug 7, 2019
1 parent 5350abb commit e438ac2
Show file tree
Hide file tree
Showing 22 changed files with 354 additions and 251 deletions.
39 changes: 13 additions & 26 deletions cli/dispatch_minimal.rs
Expand Up @@ -8,33 +8,26 @@ use crate::state::ThreadSafeState;
use deno::Buf;
use deno::CoreOp;
use deno::Op;
use deno::OpId;
use deno::PinnedBuf;
use futures::Future;

const DISPATCH_MINIMAL_TOKEN: i32 = 0xCAFE;
const OP_READ: i32 = 1;
const OP_WRITE: i32 = 2;
const OP_READ: OpId = 1;
const OP_WRITE: OpId = 2;

#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let vec = vec![
DISPATCH_MINIMAL_TOKEN,
self.promise_id,
self.op_id,
self.arg,
self.result,
];
let vec = vec![self.promise_id, self.arg, self.result];
let buf32 = vec.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -48,32 +41,25 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
let p32 = p as *const i32;
let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };

if s.len() < 5 {
if s.len() != 3 {
return None;
}
let ptr = s.as_ptr();
let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
if ints[0] != DISPATCH_MINIMAL_TOKEN {
return None;
}
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Some(Record {
promise_id: ints[1],
op_id: ints[2],
arg: ints[3],
result: ints[4],
promise_id: ints[0],
arg: ints[1],
result: ints[2],
})
}

#[test]
fn test_parse_min_record() {
let buf = vec![
0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
];
let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
assert_eq!(
parse_min_record(&buf),
Some(Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
})
Expand All @@ -88,11 +74,12 @@ fn test_parse_min_record() {

pub fn dispatch_minimal(
state: &ThreadSafeState,
op_id: OpId,
mut record: Record,
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
let min_op = match op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
_ => unimplemented!(),
Expand Down
17 changes: 7 additions & 10 deletions cli/ops.rs
Expand Up @@ -27,14 +27,7 @@ use crate::tokio_write;
use crate::version;
use crate::worker::Worker;
use atty;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::OpResult;
use deno::PinnedBuf;
use deno::*;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
Expand Down Expand Up @@ -82,16 +75,20 @@ fn empty_buf() -> Buf {
Box::new([])
}

const FLATBUFFER_OP_ID: OpId = 44;

pub fn dispatch_all(
state: &ThreadSafeState,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
dispatch_minimal(state, min_record, zero_copy)
let op = if op_id != FLATBUFFER_OP_ID {
let min_record = parse_min_record(control).unwrap();
dispatch_minimal(state, op_id, min_record, zero_copy)
} else {
dispatch_all_legacy(state, control, zero_copy, op_selector)
};
Expand Down
4 changes: 3 additions & 1 deletion cli/state.rs
Expand Up @@ -20,6 +20,7 @@ use deno::CoreOp;
use deno::ErrBox;
use deno::Loader;
use deno::ModuleSpecifier;
use deno::OpId;
use deno::PinnedBuf;
use futures::future::Shared;
use futures::Future;
Expand Down Expand Up @@ -104,10 +105,11 @@ impl Deref for ThreadSafeState {
impl ThreadSafeState {
pub fn dispatch(
&self,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
) -> CoreOp {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
ops::dispatch_all(self, op_id, control, zero_copy, self.dispatch_selector)
}
}

Expand Down
4 changes: 2 additions & 2 deletions cli/worker.rs
Expand Up @@ -29,8 +29,8 @@ impl Worker {
{
let mut i = isolate.lock().unwrap();
let state_ = state.clone();
i.set_dispatch(move |control_buf, zero_copy_buf| {
state_.dispatch(control_buf, zero_copy_buf)
i.set_dispatch(move |op_id, control_buf, zero_copy_buf| {
state_.dispatch(op_id, control_buf, zero_copy_buf)
});
let state_ = state.clone();
i.set_js_error_create(move |v8_exception| {
Expand Down
3 changes: 2 additions & 1 deletion core/core.d.ts
Expand Up @@ -5,11 +5,12 @@
// Deno and therefore do not flow through to the runtime type library.

declare interface MessageCallback {
(msg: Uint8Array): void;
(opId: number, msg: Uint8Array): void;
}

declare interface DenoCore {
dispatch(
opId: number,
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
Expand Down
20 changes: 9 additions & 11 deletions core/examples/http_bench.js
Expand Up @@ -29,20 +29,19 @@ function createResolvable() {
return Object.assign(promise, methods);
}

const scratch32 = new Int32Array(4);
const scratch32 = new Int32Array(3);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength
);
assert(scratchBytes.byteLength === 4 * 4);
assert(scratchBytes.byteLength === 3 * 4);

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
return Deno.core.dispatch(scratchBytes, zeroCopy);
scratch32[1] = arg;
scratch32[2] = -1;
return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
}

/** Returns Promise<number> */
Expand All @@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) {
}

function recordFromBuf(buf) {
assert(buf.byteLength === 16);
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
arg: buf32[1],
result: buf32[2]
};
}

Expand All @@ -72,7 +70,7 @@ function sendSync(opId, arg) {
return record.result;
}

function handleAsyncMsgFromRust(buf) {
function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
Expand Down
47 changes: 23 additions & 24 deletions core/examples/http_bench.rs
Expand Up @@ -36,25 +36,23 @@ impl log::Log for Logger {
fn flush(&self) {}
}

const OP_LISTEN: i32 = 1;
const OP_ACCEPT: i32 = 2;
const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
const OP_LISTEN: OpId = 1;
const OP_ACCEPT: OpId = 2;
const OP_READ: OpId = 3;
const OP_WRITE: OpId = 4;
const OP_CLOSE: OpId = 5;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
}

impl Into<Buf> for Record {
fn into(self) -> Buf {
let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
.into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
Expand All @@ -63,28 +61,26 @@ impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
#[allow(clippy::cast_ptr_alignment)]
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}

impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
assert_eq!(buf.len(), 4 * 4);
assert_eq!(buf.len(), 3 * 4);
#[allow(clippy::cast_ptr_alignment)]
let ptr = Box::into_raw(buf) as *mut [i32; 4];
let ptr = Box::into_raw(buf) as *mut [i32; 3];
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
assert_eq!(ints.len(), 3);
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
arg: ints[1],
result: ints[2],
}
}
}
Expand All @@ -93,7 +89,6 @@ impl From<Buf> for Record {
fn test_record_from() {
let r = Record {
promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
};
Expand All @@ -102,7 +97,7 @@ fn test_record_from() {
#[cfg(target_endian = "little")]
assert_eq!(
buf,
vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
);
let actual = Record::from(buf);
assert_eq!(actual, expected);
Expand All @@ -111,10 +106,14 @@ fn test_record_from() {

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
fn dispatch(
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<PinnedBuf>,
) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
Expand All @@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
_ => panic!("bad op {}", record.op_id),
_ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
Expand Down

0 comments on commit e438ac2

Please sign in to comment.