Skip to content
Permalink
Browse files

sharedQueue works on deno_core_http_bench

  • Loading branch information...
ry committed Mar 15, 2019
1 parent 8ae45b2 commit dd43ad900b186d1e5d3d8bb2049b6cc1a1d028b1
Showing with 660 additions and 293 deletions.
  1. +46 −57 core/http_bench.js
  2. +24 −62 core/http_bench.rs
  3. +122 −125 core/isolate.rs
  4. +3 −0 core/lib.rs
  5. +0 −49 core/shared.rs
  6. +113 −0 core/shared_queue.js
  7. +236 −0 core/shared_queue.rs
  8. +65 −0 core/shared_queue_test.js
  9. +51 −0 core/test_util.rs
@@ -6,52 +6,11 @@ const OP_ACCEPT = 2;
const OP_READ = 3;
const OP_WRITE = 4;
const OP_CLOSE = 5;
const INDEX_START = 0;
const INDEX_END = 1;
const NUM_RECORDS = 128;
const RECORD_SIZE = 4;

const shared32 = new Int32Array(libdeno.shared);

function idx(i, off) {
return 2 + i * RECORD_SIZE + off;
}

function recordsPush(promiseId, opId, arg, result) {
let i = shared32[INDEX_END];
if (i >= NUM_RECORDS) {
return false;
function assert(cond) {
if (!cond) {
throw Error("assert");
}
shared32[idx(i, 0)] = promiseId;
shared32[idx(i, 1)] = opId;
shared32[idx(i, 2)] = arg;
shared32[idx(i, 3)] = result;
shared32[INDEX_END]++;
return true;
}

function recordsShift() {
if (shared32[INDEX_START] == shared32[INDEX_END]) {
return null;
}
const i = shared32[INDEX_START];
const record = {
promiseId: shared32[idx(i, 0)],
opId: shared32[idx(i, 1)],
arg: shared32[idx(i, 2)],
result: shared32[idx(i, 3)]
};
shared32[INDEX_START]++;
return record;
}

function recordsReset() {
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
}

function recordsSize() {
return shared32[INDEX_END] - shared32[INDEX_START];
}

const requestBuf = new Uint8Array(64 * 1024);
@@ -72,32 +31,62 @@ function createResolvable() {
return Object.assign(promise, methods);
}

const scratch32 = new Int32Array(4);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength + scratch32.byteOffset
);

function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
Deno._sharedQueue.push(scratchBytes);
libdeno.send(null, zeroCopy);
}

/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopyData) {
const promiseId = nextPromiseId++;
const p = createResolvable();
recordsReset();
recordsPush(promiseId, opId, arg, -1);
promiseMap.set(promiseId, p);
send(promiseId, opId, arg, zeroCopyData);
libdeno.send(null, zeroCopyData);
return p;
}

function recv() {
const buf = Deno._sharedQueue.shift();
if (!buf) {
return null;
}
const buf32 = new Int32Array(
buf.buffer,
buf.byteOffset,
buf.byteOffset + buf.byteOffset + buf.byteLength
);
return {
promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
};
}

/** Returns u32 number */
function sendSync(opId, arg) {
recordsReset();
recordsPush(0, opId, arg, -1);
libdeno.send();
if (recordsSize() != 1) {
throw Error("Expected sharedSimple to have size 1");
}
let { result } = recordsShift();
return result;
send(0, opId, arg);
const record = recv();
assert(recv() == null);
return record.result;
}

function handleAsyncMsgFromRust() {
while (recordsSize() > 0) {
const { promiseId, result } = recordsShift();
let record;
while ((record = recv()) != null) {
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
@@ -152,7 +141,7 @@ async function main() {
libdeno.print("http_bench.js start\n");

const listenerRid = listen();
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}`);
libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`);
while (true) {
const rid = await accept(listenerRid);
// libdeno.print(`accepted ${rid}`);
@@ -16,7 +16,6 @@ use deno_core::*;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -29,12 +28,6 @@ const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;

const INDEX_START: usize = 0;
const INDEX_END: usize = 1;

const NUM_RECORDS: usize = 128;
const RECORD_SIZE: usize = 4;

#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
@@ -52,6 +45,19 @@ impl Into<Buf> for Record {
}
}

impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
}
}
}

impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
assert_eq!(buf.len(), 4 * 4);
@@ -85,46 +91,35 @@ fn test_record_from() {
);
let actual = Record::from(buf);
assert_eq!(actual, expected);

// TODO test From<&[u8]> for Record
}

pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;

struct HttpBench {
shared32: Vec<i32>,
}
struct HttpBench();

impl HttpBench {
fn new() -> Self {
let mut shared32 = Vec::<i32>::new();
let n = 2 + 4 * NUM_RECORDS;
shared32.resize(n, 0);
shared32[INDEX_START] = 0;
shared32[INDEX_END] = 0;
Self { shared32 }
Self()
}
}

fn idx(i: usize, off: usize) -> usize {
2 + i * RECORD_SIZE + off
}

impl Behavior for HttpBench {
fn startup_snapshot(&mut self) -> Option<deno_buf> {
None
}

fn startup_shared(&mut self) -> Option<deno_buf> {
let ptr = self.shared32.as_ptr() as *const u8;
let len = mem::size_of::<i32>() * self.shared32.len();
Some(unsafe { deno_buf::from_raw_parts(ptr, len) })
}

fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod {
// HttpBench doesn't do ES modules.
unimplemented!()
}

fn recv(&mut self, control: Buf, zero_copy_buf: deno_buf) -> (bool, Box<Op>) {
fn dispatch(
&mut self,
control: &[u8],
zero_copy_buf: deno_buf,
) -> (bool, Box<Op>) {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
@@ -173,41 +168,6 @@ impl Behavior for HttpBench {
);
(is_sync, op)
}

fn records_reset(&mut self) {
self.shared32[INDEX_START] = 0;
self.shared32[INDEX_END] = 0;
}

fn records_push(&mut self, buf: Buf) -> bool {
let record = Record::from(buf);
debug!("push {:?}", record);
let i = self.shared32[INDEX_END] as usize;
if i >= NUM_RECORDS {
return false;
}
self.shared32[idx(i, 0)] = record.promise_id;
self.shared32[idx(i, 1)] = record.op_id;
self.shared32[idx(i, 2)] = record.arg;
self.shared32[idx(i, 3)] = record.result;
self.shared32[INDEX_END] += 1;
true
}

fn records_shift(&mut self) -> Option<Buf> {
let i = self.shared32[INDEX_START] as usize;
if i == self.shared32[INDEX_END] as usize {
return None;
}
let record = Record {
promise_id: self.shared32[idx(i, 0)],
op_id: self.shared32[idx(i, 1)],
arg: self.shared32[idx(i, 2)],
result: self.shared32[idx(i, 3)],
};
self.shared32[INDEX_START] += 1;
Some(record.into())
}
}

fn main() {
@@ -216,6 +176,8 @@ fn main() {
let main_future = lazy(move || {
let isolate = deno_core::Isolate::new(HttpBench::new());

isolate.shared_init();

// TODO currently isolate.execute() must be run inside tokio, hence the
// lazy(). It would be nice to not have that contraint. Probably requires
// using v8::MicrotasksPolicy::kExplicit
Oops, something went wrong.

0 comments on commit dd43ad9

Please sign in to comment.
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.