Skip to content
Permalink
Browse files

Add sharedQueue tests

  • Loading branch information...
ry committed Mar 15, 2019
1 parent bf503c6 commit fecc98c8c307da8660df174c3beff8397ca1d357
Showing with 132 additions and 135 deletions.
  1. +40 −49 core/http_bench.js
  2. +14 −62 core/http_bench.rs
  3. +69 −15 core/isolate.rs
  4. +3 −1 core/shared_queue.js
  5. +1 −1 core/shared_queue.rs
  6. +0 −2 core/shared_queue_test.js
  7. +5 −5 core/test_util.rs
@@ -11,47 +11,10 @@ const INDEX_END = 1;
const NUM_RECORDS = 128;
const RECORD_SIZE = 4;

const shared32 = new Int32Array(libdeno.shared);

function idx(i, off) {
return 2 + i * RECORD_SIZE + off;
}

function recordsPush(promiseId, opId, arg, result) {
let i = shared32[INDEX_END];
if (i >= NUM_RECORDS) {
return false;
}
shared32[idx(i, 0)] = promiseId;
shared32[idx(i, 1)] = opId;
shared32[idx(i, 2)] = arg;
shared32[idx(i, 3)] = result;
shared32[INDEX_END]++;
return true;
}

function recordsShift() {
if (shared32[INDEX_START] == shared32[INDEX_END]) {
return null;
function assert(cond) {
if (!cond) {
throw Error("assert");
}
const i = shared32[INDEX_START];
const record = {
promiseId: shared32[idx(i, 0)],
opId: shared32[idx(i, 1)],
arg: shared32[idx(i, 2)],
result: shared32[idx(i, 3)]
};
shared32[INDEX_START]++;
return record;
}

function recordsReset() {
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
}

function recordsSize() {
return shared32[INDEX_END] - shared32[INDEX_START];
}

const requestBuf = new Uint8Array(64 * 1024);
@@ -83,16 +46,44 @@ function sendAsync(opId, arg, zeroCopyData) {
return p;
}

function recvResult() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
}
const buf32 = new Int32Array(
buf.buffer,
buf.byteOffset,
buf.byteOffset + buf.byteOffset + buf.byteLength
);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
};
}

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
Deno._sharedQueue.push(scratchBytes);
libdeno.send(null, zeroCopy);
}

const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength + scratch32.byteOffset
);

/** Returns u32 number */
function sendSync(opId, arg) {
recordsReset();
recordsPush(0, opId, arg, -1);
libdeno.send();
if (recordsSize() != 1) {
throw Error("Expected sharedSimple to have size 1");
}
let { result } = recordsShift();
return result;
send(0, opId, arg);
return recvResult().result;
}

function handleAsyncMsgFromRust() {
@@ -152,7 +143,7 @@ async function main() {
libdeno.print("http_bench.js start\n");

const listenerRid = listen();
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}`);
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`);
while (true) {
const rid = await accept(listenerRid);
// libdeno.print(`accepted ${rid}`);
@@ -16,7 +16,6 @@ use deno_core::*;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -29,12 +28,6 @@ const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;

const INDEX_START: usize = 0;
const INDEX_END: usize = 1;

const NUM_RECORDS: usize = 128;
const RECORD_SIZE: usize = 4;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
@@ -51,6 +44,11 @@ impl Into<Buf> for Record {
unsafe { Box::from_raw(ptr) }
}
}
impl From<&[u8]> for Record {
fn from(_s: &[u8]) -> Record {
unimplemented!()
}
}

impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
@@ -89,42 +87,29 @@ fn test_record_from() {

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

struct HttpBench {
shared32: Vec<i32>,
}
struct HttpBench();

impl HttpBench {
fn new() -> Self {
let mut shared32 = Vec::<i32>::new();
let n = 2 + 4 * NUM_RECORDS;
shared32.resize(n, 0);
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
Self { shared32 }
Self()
}
}

fn idx(i: usize, off: usize) -> usize {
2 + i * RECORD_SIZE + off
}

impl Behavior for HttpBench {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}

fn startup_shared(&mut self) -> Option<deno_buf> {
let ptr = self.shared32.as_ptr() as *const u8;
let len = mem::size_of::<i32>() * self.shared32.len();
Some(unsafe { deno_buf::from_raw_parts(ptr, len) })
}

fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod {
// HttpBench doesn't do ES modules.
unimplemented!()
}

fn recv(&mut self, control: Buf, zero_copy_buf: deno_buf) -> (bool, Box<Op>) {
fn recv(
&mut self,
control: &[u8],
zero_copy_buf: deno_buf,
) -> (bool, Box<Op>) {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
@@ -173,41 +158,6 @@ impl Behavior for HttpBench {
);
(is_sync, op)
}

fn records_reset(&mut self) {
self.shared32[INDEX_START] = 0;
self.shared32[INDEX_END] = 0;
}

fn records_push(&mut self, buf: Buf) -> bool {
let record = Record::from(buf);
debug!("push {:?}", record);
let i = self.shared32[INDEX_END] as usize;
if i >= NUM_RECORDS {
return false;
}
self.shared32[idx(i, 0)] = record.promise_id;
self.shared32[idx(i, 1)] = record.op_id;
self.shared32[idx(i, 2)] = record.arg;
self.shared32[idx(i, 3)] = record.result;
self.shared32[INDEX_END] += 1;
true
}

fn records_shift(&mut self) -> Option<Buf> {
let i = self.shared32[INDEX_START] as usize;
if i == self.shared32[INDEX_END] as usize {
return None;
}
let record = Record {
promise_id: self.shared32[idx(i, 0)],
op_id: self.shared32[idx(i, 1)],
arg: self.shared32[idx(i, 2)],
result: self.shared32[idx(i, 3)],
};
self.shared32[INDEX_START] += 1;
Some(record.into())
}
}

fn main() {
@@ -216,6 +166,8 @@ fn main() {
let main_future = lazy(move || {
let isolate = deno_core::Isolate::new(HttpBench::new());

isolate.shared_init();

// TODO currently isolate.execute() must be run inside tokio, hence the
// lazy(). It would be nice to not have that contraint. Probably requires
// using v8::MicrotasksPolicy::kExplicit
Oops, something went wrong.

0 comments on commit fecc98c

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.