Skip to content
Permalink
Browse files

sharedQueue works on deno_core_http_bench

  • Loading branch information...
ry committed Mar 15, 2019
1 parent fecc98c commit 66a491be057bdb3c239c3d53723611d41646a4e4
Showing with 35 additions and 25 deletions.
  1. +22 −22 core/http_bench.js
  2. +13 −3 core/http_bench.rs
@@ -35,18 +35,33 @@ function createResolvable() {
return Object.assign(promise, methods);
}

const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength + scratch32.byteOffset
);

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
Deno._sharedQueue.push(scratchBytes);
libdeno.send(null, zeroCopy);
}

/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopyData) {
const promiseId = nextPromiseId++;
const p = createResolvable();
recordsReset();
recordsPush(promiseId, opId, arg, -1);
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopyData);
libdeno.send(null, zeroCopyData);
return p;
}

function recvResult() {
function recv() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
@@ -64,31 +79,16 @@ function recvResult() {
};
}

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
Deno._sharedQueue.push(scratchBytes);
libdeno.send(null, zeroCopy);
}

const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength + scratch32.byteOffset
);

/** Returns u32 number */
function sendSync(opId, arg) {
send(0, opId, arg);
return recvResult().result;
return recv().result;
}

function handleAsyncMsgFromRust() {
while (recordsSize() > 0) {
const { promiseId, result } = recordsShift();
let record;
while ((record = recv()) != null) {
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
@@ -44,9 +44,17 @@ impl Into<Buf> for Record {
unsafe { Box::from_raw(ptr) }
}
}

impl From<&[u8]> for Record {
fn from(_s: &[u8]) -> Record {
unimplemented!()
fn from(s: &[u8]) -> Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
}
}
}

@@ -83,6 +91,8 @@ fn test_record_from() {
);
let actual = Record::from(buf);
assert_eq!(actual, expected);

// TODO test From<&[u8]> for Record
}

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
@@ -105,7 +115,7 @@ impl Behavior for HttpBench {
unimplemented!()
}

fn recv(
fn dispatch(
&mut self,
control: &[u8],
zero_copy_buf: deno_buf,

0 comments on commit 66a491b

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.