Skip to content
Permalink
Browse files

wip

  • Loading branch information...
ry committed Mar 15, 2019
1 parent 6352a67 commit 99a270d8d0e6c6c5313d3b671fb42a82ed20c685
Showing with 507 additions and 112 deletions.
  1. +37 −112 core/isolate.rs
  2. +3 −0 core/lib.rs
  3. +110 −0 core/shared_queue.js
  4. +234 −0 core/shared_queue.rs
  5. +65 −0 core/shared_queue_test.js
  6. +58 −0 core/test_util.rs
@@ -3,6 +3,8 @@ use crate::js_errors::JSError;
use crate::libdeno;
use crate::libdeno::deno_buf;
use crate::libdeno::deno_mod;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use futures::Async;
use futures::Future;
use futures::Poll;
@@ -46,30 +48,12 @@ pub trait Behavior {
/// snapshot.
fn startup_snapshot(&mut self) -> Option<deno_buf>;

/// Called exactly once when an Isolate is created to provide the
/// backing memory for the libdeno.shared SharedArrayBuffer.
fn startup_shared(&mut self) -> Option<deno_buf>;

/// Called during mod_instantiate() to resolve imports.
fn resolve(&mut self, specifier: &str, referrer: deno_mod) -> deno_mod;

/// Called whenever libdeno.send() is called in JavaScript. zero_copy_buf
/// corresponds to the second argument of libdeno.send().
fn recv(&mut self, record: Buf, zero_copy_buf: deno_buf) -> (bool, Box<Op>);

// TODO(ry) Remove records_reset().
// TODO(ry) Abstract records_* and startup_shared() methods into standalone
// trait called Shared. It should, however, wait until integration with
// existing Deno codebase is complete.

/// Clears the shared buffer.
fn records_reset(&mut self);

/// Returns false if not enough room.
fn records_push(&mut self, record: Buf) -> bool;

/// Returns none if empty.
fn records_shift(&mut self) -> Option<Buf>;
fn recv(&mut self, control: &[u8], zero_copy_buf: deno_buf) -> (bool, Box<Op>);
}

/// A single execution context of JavaScript. Corresponds roughly to the "Web
@@ -83,6 +67,7 @@ pub trait Behavior {
pub struct Isolate<B: Behavior> {
libdeno_isolate: *const libdeno::isolate,
behavior: B,
shared: SharedQueue,
pending_ops: Vec<PendingOp>,
polled_recently: bool,
}
@@ -103,45 +88,57 @@ impl<B: Behavior> Isolate<B> {
unsafe { libdeno::deno_init() };
});

let shared = SharedQueue::new(RECOMMENDED_SIZE);

let config = libdeno::deno_config {
will_snapshot: 0,
load_snapshot: match behavior.startup_snapshot() {
Some(s) => s,
None => libdeno::deno_buf::empty(),
},
shared: match behavior.startup_shared() {
Some(s) => s,
None => libdeno::deno_buf::empty(),
},
shared: shared.as_deno_buf(),
recv_cb: Self::pre_dispatch,
};
let libdeno_isolate = unsafe { libdeno::deno_new(config) };

Self {
libdeno_isolate,
behavior,
shared,
pending_ops: Vec::new(),
polled_recently: false,
}
}

extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_buf: deno_buf,
control_argv0: deno_buf,
zero_copy_buf: deno_buf,
) {
let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) };
assert_eq!(control_buf.len(), 0);
let zero_copy_id = zero_copy_buf.zero_copy_id;

let req_record = isolate.behavior.records_shift().unwrap();
let control_shared = isolate.shared.shift();

let (is_sync, op) = if control_argv0.len() > 0 {
// The user called libdeno.send(control)
assert!(control_shared.is_none());
isolate.behavior.recv(control_argv0.as_ref(), zero_copy_buf)
} else if let Some(c) = control_shared {
// The user called Deno._sharedQueue.push(control)
// At this point the SharedQueue should be empty.
assert!(isolate.shared.shift().is_none());
isolate.behavior.recv(&c, zero_copy_buf)
} else {
// The sharedQueue is empty. The shouldn't happen usually, but it's also
// not technically a failure.
return;
};

isolate.behavior.records_reset();

let (is_sync, op) = isolate.behavior.recv(req_record, zero_copy_buf);
if is_sync {
let res_record = op.wait().unwrap();
let push_success = isolate.behavior.records_push(res_record);
let push_success = isolate.shared.push(res_record);
assert!(push_success);
// TODO check that if JSError thrown during respond(), that it will be
// picked up.
@@ -342,7 +339,7 @@ impl<B: Behavior> Future for Isolate<B> {

self.polled_recently = true;

self.behavior.records_reset();
self.shared.reset();

let mut i = 0;
while i != self.pending_ops.len() {
@@ -360,7 +357,7 @@ impl<B: Behavior> Future for Isolate<B> {
self.zero_copy_release(completed.zero_copy_id);
}

self.behavior.records_push(record);
self.shared.push(record);
}
}
}
@@ -389,82 +386,7 @@ impl<B: Behavior> Future for Isolate<B> {
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

fn js_check(r: Result<(), JSError>) {
if let Err(e) = r {
panic!(e.to_string());
}
}

struct TestBehavior {
recv_count: usize,
resolve_count: usize,
push_count: usize,
shift_count: usize,
reset_count: usize,
mod_map: HashMap<String, deno_mod>,
}

impl TestBehavior {
fn new() -> Self {
Self {
recv_count: 0,
resolve_count: 0,
push_count: 0,
shift_count: 0,
reset_count: 0,
mod_map: HashMap::new(),
}
}

fn register(&mut self, name: &str, id: deno_mod) {
self.mod_map.insert(name.to_string(), id);
}
}

impl Behavior for TestBehavior {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}

fn startup_shared(&mut self) -> Option<deno_buf> {
None
}

fn recv(
&mut self,
_record: Buf,
_zero_copy_buf: deno_buf,
) -> (bool, Box<Op>) {
self.recv_count += 1;
let buf = vec![42u8].into_boxed_slice();
(false, Box::new(futures::future::ok(buf)))
}

fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod {
self.resolve_count += 1;
match self.mod_map.get(specifier) {
Some(id) => *id,
None => 0,
}
}

fn records_reset(&mut self) {
self.reset_count += 1;
}

fn records_push(&mut self, _record: Buf) -> bool {
self.push_count += 1;
true
}

fn records_shift(&mut self) -> Option<Buf> {
self.shift_count += 1;
let buf = Box::new([42]);
Some(buf)
}
}
use crate::test_util::*;

#[test]
fn test_recv() {
@@ -473,9 +395,10 @@ mod tests {
js_check(isolate.execute(
"filename.js",
r#"
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
async function main() {
libdeno.send();
libdeno.send(control);
}
main();
"#,
@@ -494,7 +417,8 @@ mod tests {
r#"
import { b } from 'b.js'
if (b() != 'b') throw Error();
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
"#,
).unwrap();
assert_eq!(isolate.behavior.recv_count, 0);
@@ -547,7 +471,8 @@ mod tests {
"check1.js",
r#"
assertEq(nrecv, 0);
libdeno.send();
let control = new Uint8Array([42]);
libdeno.send(control);
assertEq(nrecv, 0);
"#,
));
@@ -558,7 +483,7 @@ mod tests {
"check2.js",
r#"
assertEq(nrecv, 1);
libdeno.send();
libdeno.send(control);
assertEq(nrecv, 1);
"#,
));
@@ -8,6 +8,9 @@ mod flags;
mod isolate;
mod js_errors;
mod libdeno;
mod shared_queue;
#[cfg(test)]
mod test_util;

pub use crate::flags::v8_set_flags;
pub use crate::isolate::*;
@@ -0,0 +1,110 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
(() => {
const MAX_RECORDS = 100;
const INDEX_NUM_RECORDS = 0;
const INDEX_NUM_SHIFTED_OFF = 1;
const INDEX_HEAD = 2;
const INDEX_OFFSETS = 3;
const INDEX_RECORDS = 3 + MAX_RECORDS;
const HEAD_INIT = 4 * INDEX_RECORDS;

let sharedBytes = null;
let shared32 = null;

const window = (0, eval)("this");

if (!window["Deno"]) {
window["Deno"] = {};
}

function assert(cond) {
if (!cond) {
throw Error("assert");
}
}

function init(shared) {
assert(sharedBytes == null);
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
}

function reset() {
shared32.fill(0, 0, INDEX_RECORDS);
shared32[INDEX_HEAD] = HEAD_INIT;
}

function push(buf) {
let off = head();
let end = off + buf.byteLength;
let index = numRecords();
if (end > shared32.byteLength) {
console.log("shared_queue.ts push fail");
return false;
}
setEnd(index, end);
assert(end - off == buf.byteLength);
sharedBytes.set(buf, off);
shared32[INDEX_NUM_RECORDS] += 1;
shared32[INDEX_HEAD] = end;
return true;
}

/// Returns null if empty.
function shift() {
let i = shared32[INDEX_NUM_SHIFTED_OFF];
if (i >= numRecords()) {
return null;
}
let off = getOffset(i);
let end = getEnd(i);
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
return sharedBytes.subarray(off, end);
}

function head() {
return shared32[INDEX_HEAD];
}

function setEnd(index, end) {
shared32[INDEX_OFFSETS + index] = end;
}

function getEnd(index) {
if (index < numRecords()) {
return shared32[INDEX_OFFSETS + index];
} else {
return null;
}
}

function getOffset(index) {
if (index < numRecords()) {
if (index == 0) {
return HEAD_INIT;
} else {
return shared32[INDEX_OFFSETS + index - 1];
}
} else {
return null;
}
}

function numRecords() {
return shared32[INDEX_NUM_RECORDS];
}

function activeRecords() {
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}

window.Deno._sharedQueue = {
init,
head,
numRecords,
activeRecords,
push,
shift
};
})();
Oops, something went wrong.

0 comments on commit 99a270d

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.