diff --git a/Cargo.lock b/Cargo.lock index 52cece4fd12b3..8b1c76c37e050 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,12 +385,12 @@ dependencies = [ name = "deno_core" version = "0.32.0" dependencies = [ + "derive_deref 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "downcast-rs 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusty_v8 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -406,6 +406,16 @@ dependencies = [ "serde_json 1.0.45 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "derive_deref" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.44 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "dirs" version = "2.0.2" @@ -2346,6 +2356,7 @@ dependencies = [ "checksum darling 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" "checksum darling_core 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" "checksum darling_macro 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +"checksum derive_deref 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "11554fdb0aa42363a442e0c4278f51c9621e20c1ce3bac51d79e60646f3b8b8f" "checksum dirs 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "13aea89a5c93364a98e9b37b2fa237effbb694d5cfe01c5b70941f7eb087d5e3" "checksum dirs-sys 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "afa0b23de8fd801745c471deffa6e12d248f962c9fd4b4c33787b055599bde7b" "checksum dlopen 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "71e80ad39f814a9abe68583cd50a2d45c8a67561c3361ab8da240587dda80937" diff --git a/core/Cargo.toml b/core/Cargo.toml index 59a0bd2bc527e..f0d1fffc8caa1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,7 +27,7 @@ rusty_v8 = "0.2.1" name = "deno_core_http_bench" path = "examples/http_bench.rs" -# tokio is only used for deno_core_http_bench -[dev_dependencies] +# These dependendencies are only used for deno_core_http_bench. +[dev-dependencies] +derive_deref = "1.1.0" tokio = { version = "0.2", features = ["rt-core", "tcp"] } -num_cpus = "1.11.1" diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 9473b68801377..abe81e41e3334 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array( ); assert(scratchBytes.byteLength === 3 * 4); -function send(promiseId, opId, arg, zeroCopy = null) { +function send(promiseId, opId, rid, zeroCopy = null) { scratch32[0] = promiseId; - scratch32[1] = arg; + scratch32[1] = rid; scratch32[2] = -1; return Deno.core.dispatch(opId, scratchBytes, zeroCopy); } /** Returns Promise */ -function sendAsync(opId, arg, zeroCopy = null) { +function sendAsync(opId, rid, zeroCopy = null) { const promiseId = nextPromiseId++; const p = createResolvable(); - const buf = send(promiseId, opId, arg, zeroCopy); + const buf = send(promiseId, opId, rid, zeroCopy); if (buf) { const record = recordFromBuf(buf); // Sync result. @@ -60,8 +60,8 @@ function sendAsync(opId, arg, zeroCopy = null) { } /** Returns i32 number */ -function sendSync(opId, arg) { - const buf = send(0, opId, arg); +function sendSync(opId, rid) { + const buf = send(0, opId, rid); const record = recordFromBuf(buf); return record[2]; } @@ -131,7 +131,7 @@ async function main() { Deno.core.print("http_bench.js start\n"); const listenerRid = listen(); - Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); + Deno.core.print(`listening http://127.0.0.1:4544/ rid=${listenerRid}\n`); while (true) { const rid = await accept(listenerRid); // Deno.core.print(`accepted ${rid}`); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index fa570acfb91cd..9b1edd9bf8bb0 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -2,32 +2,33 @@ /// /// > DENO_BUILD_MODE=release ./tools/build.py && \ /// ./target/release/deno_core_http_bench --multi-thread -extern crate deno_core; -extern crate futures; -extern crate libc; -extern crate num_cpus; -extern crate tokio; #[macro_use] -extern crate log; +extern crate derive_deref; #[macro_use] -extern crate lazy_static; +extern crate log; +use deno_core::Isolate as CoreIsolate; use deno_core::*; -use futures::future::Future; -use futures::future::FutureExt; -use futures::task::{Context, Poll}; +use futures::future::poll_fn; +use futures::prelude::*; +use futures::task::Context; +use futures::task::Poll; +use std::cell::RefCell; +use std::convert::TryInto; use std::env; +use std::fmt::Debug; use std::io::Error; use std::io::ErrorKind; +use std::mem::size_of; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::Mutex; -use std::sync::MutexGuard; +use std::ptr; +use std::rc::Rc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; - -static LOGGER: Logger = Logger; +use tokio::net::TcpListener; +use tokio::net::TcpStream; struct Logger; @@ -35,330 +36,245 @@ impl log::Log for Logger { fn enabled(&self, metadata: &log::Metadata) -> bool { metadata.level() <= log::max_level() } + fn log(&self, record: &log::Record) { if self.enabled(record.metadata()) { println!("{} - {}", record.level(), record.args()); } } + fn flush(&self) {} } -#[derive(Clone, Debug, PartialEq)] -pub struct Record { - pub promise_id: i32, - pub arg: i32, +#[derive(Copy, Clone, Debug, PartialEq)] +struct Record { + pub promise_id: u32, + pub rid: u32, pub result: i32, } -impl Into for Record { - fn into(self) -> Buf { - let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; - unsafe { Box::from_raw(ptr) } - } -} +type RecordBuf = [u8; size_of::()]; impl From<&[u8]> for Record { - fn from(s: &[u8]) -> Record { - #[allow(clippy::cast_ptr_alignment)] - let ptr = s.as_ptr() as *const i32; - let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; - Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], - } + fn from(buf: &[u8]) -> Self { + assert_eq!(buf.len(), size_of::()); + unsafe { *(buf as *const _ as *const RecordBuf) }.into() } } -impl From for Record { - fn from(buf: Buf) -> Record { - assert_eq!(buf.len(), 3 * 4); - #[allow(clippy::cast_ptr_alignment)] - let ptr = Box::into_raw(buf) as *mut [i32; 3]; - let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; - assert_eq!(ints.len(), 3); - Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], +impl From for Record { + fn from(buf: RecordBuf) -> Self { + unsafe { + #[allow(clippy::cast_ptr_alignment)] + ptr::read_unaligned(&buf as *const _ as *const Self) } } } -#[test] -fn test_record_from() { - let r = Record { - promise_id: 1, - arg: 3, - result: 4, - }; - let expected = r.clone(); - let buf: Buf = r.into(); - #[cfg(target_endian = "little")] - assert_eq!( - buf, - vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() - ); - let actual = Record::from(buf); - assert_eq!(actual, expected); - // TODO test From<&[u8]> for Record -} - -pub type HttpOp = dyn Future> + Send; - -pub type HttpOpHandler = - fn(record: Record, zero_copy_buf: Option) -> Pin>; - -fn http_op( - handler: HttpOpHandler, -) -> impl Fn(&[u8], Option) -> CoreOp { - move |control: &[u8], zero_copy_buf: Option| -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let op = handler(record.clone(), zero_copy_buf); - let mut record_a = record; - - let fut = async move { - match op.await { - Ok(result) => record_a.result = result, - Err(err) => { - eprintln!("unexpected err {}", err); - record_a.result = -1; - } - }; - Ok(record_a.into()) - }; - - if is_sync { - Op::Sync(futures::executor::block_on(fut).unwrap()) - } else { - Op::Async(fut.boxed()) - } +impl From for RecordBuf { + fn from(record: Record) -> Self { + unsafe { ptr::read(&record as *const _ as *const Self) } } } -fn main() { - let args: Vec = env::args().collect(); - // NOTE: `--help` arg will display V8 help and exit - let args = deno_core::v8_set_flags(args); - - log::set_logger(&LOGGER).unwrap(); - log::set_max_level(if args.iter().any(|a| a == "-D") { - log::LevelFilter::Debug - } else { - log::LevelFilter::Warn - }); - - let js_source = include_str!("http_bench.js"); - - let startup_data = StartupData::Script(Script { - source: js_source, - filename: "http_bench.js", - }); - - let isolate = deno_core::Isolate::new(startup_data, false); - isolate.register_op("listen", http_op(op_listen)); - isolate.register_op("accept", http_op(op_accept)); - isolate.register_op("read", http_op(op_read)); - isolate.register_op("write", http_op(op_write)); - isolate.register_op("close", http_op(op_close)); - - println!( - "num cpus; logical: {}; physical: {}", - num_cpus::get(), - num_cpus::get_physical() - ); - - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap(); - let result = runtime.block_on(isolate.boxed_local()); - js_check(result); +struct Isolate { + core_isolate: Box, // Unclear why CoreIsolate::new() returns a box. + state: State, } -pub fn bad_resource() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") -} +#[derive(Clone, Default, Deref)] +struct State(Rc>); -struct TcpListener(tokio::net::TcpListener); +#[derive(Default)] +struct StateInner { + resource_table: ResourceTable, +} -impl Resource for TcpListener {} +impl Isolate { + pub fn new() -> Self { + let startup_data = StartupData::Script(Script { + source: include_str!("http_bench.js"), + filename: "http_bench.js", + }); -struct TcpStream(tokio::net::TcpStream); + let mut isolate = Self { + core_isolate: CoreIsolate::new(startup_data, false), + state: Default::default(), + }; -impl Resource for TcpStream {} + isolate.register_op("listen", op_listen); + isolate.register_op("accept", op_accept); + isolate.register_op("read", op_read); + isolate.register_op("write", op_write); + isolate.register_op("close", op_close); -lazy_static! { - static ref RESOURCE_TABLE: Mutex = - Mutex::new(ResourceTable::default()); -} + isolate + } -fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { - RESOURCE_TABLE.lock().unwrap() -} + fn register_op( + &mut self, + name: &'static str, + handler: impl Fn(State, u32, Option) -> F + Copy + 'static, + ) where + F: TryFuture, + F::Ok: TryInto, + >::Error: Debug, + { + let state = self.state.clone(); + let core_handler = + move |control_buf: &[u8], zero_copy_buf: Option| -> CoreOp { + let state = state.clone(); + let record = Record::from(control_buf); + let is_sync = record.promise_id == 0; + + let fut = async move { + let op = handler(state, record.rid, zero_copy_buf); + let result = op + .map_ok(|r| r.try_into().expect("op result does not fit in i32")) + .unwrap_or_else(|_| -1) + .await; + Ok(RecordBuf::from(Record { result, ..record })[..].into()) + }; + + if is_sync { + Op::Sync(futures::executor::block_on(fut).unwrap()) + } else { + Op::Async(fut.boxed_local()) + } + }; -struct Accept { - rid: ResourceId, + self.core_isolate.register_op(name, core_handler); + } } -impl Future for Accept { - type Output = Result<(tokio::net::TcpStream, SocketAddr), std::io::Error>; +impl Future for Isolate { + type Output = ::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - - let mut table = lock_resource_table(); - match table.get_mut::(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(listener) => { - let listener = &mut listener.0; - listener.poll_accept(cx) - } - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.core_isolate.poll_unpin(cx) } } -fn op_accept( - record: Record, - _zero_copy_buf: Option, -) -> Pin> { - let rid = record.arg as u32; - debug!("accept {}", rid); - - let fut = async move { - let (stream, addr) = Accept { rid }.await?; - debug!("accept success {}", addr); - let mut table = lock_resource_table(); - let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - Ok(rid as i32) - }; - - fut.boxed() +fn op_close( + state: State, + rid: u32, + _buf: Option, +) -> impl TryFuture { + debug!("close rid={}", rid); + + async move { + let resource_table = &mut state.borrow_mut().resource_table; + resource_table + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource) + } } fn op_listen( - _record: Record, - _zero_copy_buf: Option, -) -> Pin> { + state: State, + _rid: u32, + _buf: Option, +) -> impl TryFuture { debug!("listen"); - let fut = async { + + async move { let addr = "127.0.0.1:4544".parse::().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).await?; - let mut table = lock_resource_table(); - let rid = table.add("tcpListener", Box::new(TcpListener(listener))); - Ok(rid as i32) - }; - - fut.boxed() -} - -fn op_close( - record: Record, - _zero_copy_buf: Option, -) -> Pin> { - debug!("close"); - let fut = async move { - let rid = record.arg as u32; - let mut table = lock_resource_table(); - match table.close(rid) { - Some(_) => Ok(0), - None => Err(bad_resource()), - } - }; - fut.boxed() -} - -struct Read { - rid: ResourceId, - buf: ZeroCopyBuf, + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(rid) + } } -impl Future for Read { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut table = lock_resource_table(); - - match table.get_mut::(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(stream) => { - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_read(cx, &mut inner.buf) - } - } - } +fn op_accept( + state: State, + rid: u32, + _buf: Option, +) -> impl TryFuture { + debug!("accept rid={}", rid); + + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let listener = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + listener.poll_accept(cx).map_ok(|(stream, _addr)| { + resource_table.add("tcpStream", Box::new(stream)) + }) + }) } fn op_read( - record: Record, - zero_copy_buf: Option, -) -> Pin> { - let rid = record.arg as u32; + state: State, + rid: u32, + buf: Option, +) -> impl TryFuture { + let mut buf = buf.unwrap(); debug!("read rid={}", rid); - let zero_copy_buf = zero_copy_buf.unwrap(); - - let fut = async move { - let nread = Read { - rid, - buf: zero_copy_buf, - } - .await?; - debug!("read success {}", nread); - Ok(nread as i32) - }; - - fut.boxed() -} -struct Write { - rid: ResourceId, - buf: ZeroCopyBuf, + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_read(cx, &mut buf) + }) } -impl Future for Write { - type Output = Result; +fn op_write( + state: State, + rid: u32, + buf: Option, +) -> impl TryFuture { + let buf = buf.unwrap(); + debug!("write rid={}", rid); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut table = lock_resource_table(); + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let stream = resource_table + .get_mut::(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_write(cx, &buf) + }) +} - match table.get_mut::(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(stream) => { - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_write(cx, &inner.buf) - } - } - } +fn bad_resource() -> Error { + Error::new(ErrorKind::NotFound, "bad resource id") } -fn op_write( - record: Record, - zero_copy_buf: Option, -) -> Pin> { - let rid = record.arg as u32; - debug!("write rid={}", rid); - let zero_copy_buf = zero_copy_buf.unwrap(); +fn main() { + log::set_logger(&Logger).unwrap(); + log::set_max_level( + env::args() + .find(|a| a == "-D") + .map(|_| log::LevelFilter::Debug) + .unwrap_or(log::LevelFilter::Warn), + ); - let fut = async move { - let nwritten = Write { - rid, - buf: zero_copy_buf, - } - .await?; - debug!("write success {}", nwritten); - Ok(nwritten as i32) - }; + // NOTE: `--help` arg will display V8 help and exit + deno_core::v8_set_flags(env::args().collect()); - fut.boxed() + let isolate = Isolate::new(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + runtime.block_on(isolate).expect("unexpected isolate error"); } -fn js_check(r: Result<(), ErrBox>) { - if let Err(e) = r { - panic!(e.to_string()); +#[test] +fn test_record_from() { + let expected = Record { + promise_id: 1, + rid: 3, + result: 4, + }; + let buf = RecordBuf::from(expected); + if cfg!(target_endian = "little") { + assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); } + let actual = Record::from(buf); + assert_eq!(actual, expected); } diff --git a/tools/http_benchmark.py b/tools/http_benchmark.py index 05637612813a0..05cd542e3c041 100755 --- a/tools/http_benchmark.py +++ b/tools/http_benchmark.py @@ -82,9 +82,9 @@ def deno_http_proxy(deno_exe, hyper_hello_exe): origin_cmd=http_proxy_origin(hyper_hello_exe, origin_port)) -def deno_core_single(exe): - print "http_benchmark testing deno_core_single" - return run([exe, "--single-thread"], 4544) +def deno_core_http_bench(exe): + print "http_benchmark testing deno_core_http_bench" + return run([exe], 4544) def node_http(): @@ -132,8 +132,8 @@ def hyper_http(hyper_hello_exe): def http_benchmark(build_dir): hyper_hello_exe = os.path.join(build_dir, "hyper_hello") - core_http_bench_exe = os.path.join(build_dir, - "examples/deno_core_http_bench") + deno_core_http_bench_exe = os.path.join(build_dir, + "examples/deno_core_http_bench") deno_exe = os.path.join(build_dir, "deno") return { # "deno_tcp" was once called "deno" @@ -142,7 +142,8 @@ def http_benchmark(build_dir): "deno_http": deno_http(deno_exe), "deno_proxy": deno_http_proxy(deno_exe, hyper_hello_exe), "deno_proxy_tcp": deno_tcp_proxy(deno_exe, hyper_hello_exe), - "deno_core_single": deno_core_single(core_http_bench_exe), + # "deno_core_http_bench" was once called "deno_core_single" + "deno_core_http_bench": deno_core_http_bench(deno_core_http_bench_exe), # "node_http" was once called "node" "node_http": node_http(), "node_proxy": node_http_proxy(hyper_hello_exe),