Skip to content
Permalink
Browse files

setAsyncHandler

  • Loading branch information...
ry committed Mar 15, 2019
1 parent cffe1d7 commit cb3ccb90425af813581485dec6849eca7688e9f7
Showing with 49 additions and 39 deletions.
  1. +18 −20 core/http_bench.js
  2. +7 −11 core/isolate.rs
  3. +24 −8 core/shared_queue.js
@@ -35,7 +35,7 @@ const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteOffset + scratch32.byteLength
scratch32.byteLength
);
assert(scratchBytes.byteLength === 4 * 4);

@@ -65,16 +65,8 @@ function sendAsync(opId, arg, zeroCopyData) {
return p;
}

function recv() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
}
const buf32 = new Int32Array(
buf.buffer,
buf.byteOffset,
buf.byteOffset + buf.byteLength
);
function recordFromBuf(buf) {
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength);
return {
promiseId: buf32[0],
opId: buf32[1],
@@ -83,6 +75,14 @@ function recv() {
};
}

function recv() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
}
return recordFromBuf(buf);
}

/** Returns u32 number */
function sendSync(opId, arg) {
send(0, opId, arg);
@@ -91,14 +91,12 @@ function sendSync(opId, arg) {
return record.result;
}

function handleAsyncMsgFromRust() {
let record;
while ((record = recv()) != null) {
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
}
function handleAsyncMsgFromRust(buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
}

/** Listens on 0.0.0.0:4500, returns rid. */
@@ -144,7 +142,7 @@ async function serve(rid) {
}

async function main() {
libdeno.recv(handleAsyncMsgFromRust);
Deno.setAsyncHandler(handleAsyncMsgFromRust);

libdeno.print("http_bench.js start\n");

@@ -373,7 +373,9 @@ impl<B: Behavior> Future for Isolate<B> {

if completed_count > 0 {
self.respond()?;
// The other side should have shifted off all the messages.
self.shared.reset();
// assert_eq!(self.shared.size(), 0);
}
}

@@ -466,11 +468,13 @@ mod tests {
let behavior = TestBehavior::new();
let mut isolate = Isolate::new(behavior);

isolate.shared_init();

js_check(isolate.execute(
"setup.js",
r#"
let nrecv = 0;
libdeno.recv(() => {
Deno.setAsyncHandler((buf) => {
nrecv++;
});
function assertEq(actual, expected) {
@@ -520,17 +524,9 @@ mod tests {
"setup.js",
r#"
let nrecv = 0;
libdeno.recv(() => {
let buf = Deno._sharedQueue.shift();
assert(buf.byteLength === 1);
assert(buf[0] === 43);
buf = Deno._sharedQueue.shift();
Deno.setAsyncHandler((buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
buf = Deno._sharedQueue.shift();
assert(buf == null);
nrecv++;
});
function assert(cond) {
@@ -558,7 +554,7 @@ mod tests {
assert_eq!(isolate.behavior.dispatch_count, 2);
assert_eq!(Ok(Async::Ready(())), isolate.poll());

js_check(isolate.execute("send1.js", "assert(nrecv === 1);"));
js_check(isolate.execute("send1.js", "assert(nrecv === 2);"));
}

}
@@ -23,14 +23,6 @@
}
}

function init(shared) {
assert(shared.byteLength > 0);
assert(sharedBytes == null);
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
}

function reset() {
shared32.fill(0, 0, INDEX_RECORDS);
shared32[INDEX_HEAD] = HEAD_INIT;
@@ -100,6 +92,30 @@
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}

let asyncHandler = null;
function setAsyncHandler(cb) {
assert(asyncHandler == null);
asyncHandler = cb;
}

function handleAsyncMsgFromRust() {
let buf;
while ((buf = shift()) != null) {
asyncHandler(buf);
}
}

function init(shared) {
assert(shared.byteLength > 0);
assert(sharedBytes == null);
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
// Callers should not call libdeno.recv, use setAsyncHandler.
libdeno.recv(handleAsyncMsgFromRust);
}

window.Deno.setAsyncHandler = setAsyncHandler;
window.Deno._sharedQueue = {
head,
numRecords,

0 comments on commit cb3ccb9

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.