From bd2a99c898f5bd72875afb0aee8fc6c791ff8df1 Mon Sep 17 00:00:00 2001 From: kallebysantos Date: Thu, 11 Jul 2024 12:40:59 +0100 Subject: [PATCH 01/27] feat: improved `install_onnx` script `install_onnx` script now supports `--gpu` flag to download runtime with cuda provider Signed-off-by: kallebysantos --- .devcontainer/devcontainer.json | 4 ++-- Dockerfile | 22 ++++++++++---------- scripts/install_onnx.sh | 17 ++++++++++++---- scripts/run_dind.sh | 36 ++++++++++++++++----------------- 4 files changed, 44 insertions(+), 35 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 3b0fcadd8..05d024879 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -4,7 +4,7 @@ "dockerfile": "Dockerfile", "context": "..", "args": { - "ONNXRUNTIME_VERSION": "1.17.0", + "ONNXRUNTIME_VERSION": "1.19.2", "DENO_VERSION": "1.45.2" } }, @@ -39,4 +39,4 @@ ] } } -} \ No newline at end of file +} diff --git a/Dockerfile b/Dockerfile index 606b11682..77c3734a7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ FROM rust:1.79.0-bookworm as builder ARG TARGETPLATFORM ARG GIT_V_VERSION -ARG ONNXRUNTIME_VERSION=1.17.0 +ARG ONNXRUNTIME_VERSION=1.19.2 ARG PROFILE=release ARG FEATURES @@ -15,18 +15,18 @@ WORKDIR /usr/src/edge-runtime COPY . . RUN --mount=type=cache,target=/usr/local/cargo/registry,id=${TARGETPLATFORM} --mount=type=cache,target=/usr/src/edge-runtime/target,id=${TARGETPLATFORM} \ - GIT_V_TAG=${GIT_V_VERSION} cargo build --profile ${PROFILE} --features "${FEATURES}" && \ - mv /usr/src/edge-runtime/target/${PROFILE}/edge-runtime /root + GIT_V_TAG=${GIT_V_VERSION} cargo build --profile ${PROFILE} --features "${FEATURES}" && \ + mv /usr/src/edge-runtime/target/${PROFILE}/edge-runtime /root RUN objcopy --compress-debug-sections \ - --only-keep-debug \ - /root/edge-runtime \ - /root/edge-runtime.debug + --only-keep-debug \ + /root/edge-runtime \ + /root/edge-runtime.debug RUN objcopy --strip-debug \ - --add-gnu-debuglink=/root/edge-runtime.debug \ - /root/edge-runtime + --add-gnu-debuglink=/root/edge-runtime.debug \ + /root/edge-runtime -RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/libonnxruntime.so +RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/onnxruntime RUN ./scripts/download_models.sh FROM debian:bookworm-slim @@ -36,10 +36,10 @@ RUN apt-get remove -y perl && apt-get autoremove -y COPY --from=builder /root/edge-runtime /usr/local/bin/edge-runtime COPY --from=builder /root/edge-runtime.debug /usr/local/bin/edge-runtime.debug -COPY --from=builder /root/libonnxruntime.so /usr/local/bin/libonnxruntime.so +COPY --from=builder /root/onnxruntime /usr/local/bin/onnxruntime COPY --from=builder /usr/src/edge-runtime/models /etc/sb_ai/models -ENV ORT_DYLIB_PATH=/usr/local/bin/libonnxruntime.so +ENV ORT_DYLIB_PATH=/usr/local/bin/onnxruntime/lib/libonnxruntime.so ENV SB_AI_MODELS_DIR=/etc/sb_ai/models ENTRYPOINT ["edge-runtime"] diff --git a/scripts/install_onnx.sh b/scripts/install_onnx.sh index 54d189dab..601e1cf73 100755 --- a/scripts/install_onnx.sh +++ b/scripts/install_onnx.sh @@ -1,9 +1,18 @@ #!/usr/bin/env bash -curl -O https://registry.npmjs.org/onnxruntime-node/-/onnxruntime-node-$1.tgz && tar zxvf onnxruntime-node-$1.tgz +ONNX_VERSION=${1:-1.19.2} +TARGETPLATFORM=$2 +SAVE_PATH=${3:-"./onnx-runtime"} -if [ "$2" == "linux/arm64" ]; then - mv ./package/bin/napi-v3/linux/arm64/libonnxruntime.so.$1 $3 +ONNX_DOWNLOAD_FILE="onnxruntime-linux" +ONNX_TARGET_PLATFORM=$([ "$TARGETPLATFORM" == "linux/arm64" ] && echo "aarch64" || echo "x64") + +if [[ $* == *"--gpu"* ]]; then + ONNX_DOWNLOAD_FILE="$ONNX_DOWNLOAD_FILE-$ONNX_TARGET_PLATFORM-gpu-$ONNX_VERSION" else - mv ./package/bin/napi-v3/linux/x64/libonnxruntime.so.$1 $3 + ONNX_DOWNLOAD_FILE="$ONNX_DOWNLOAD_FILE-$ONNX_TARGET_PLATFORM-$ONNX_VERSION" fi + +wget -qO- "https://github.com/microsoft/onnxruntime/releases/download/v${ONNX_VERSION}/${ONNX_DOWNLOAD_FILE}.tgz" | tar zxv + +mv "$ONNX_DOWNLOAD_FILE" "$SAVE_PATH" diff --git a/scripts/run_dind.sh b/scripts/run_dind.sh index a254bfa68..df1aedb85 100755 --- a/scripts/run_dind.sh +++ b/scripts/run_dind.sh @@ -2,15 +2,15 @@ GIT_V_TAG=0.1.1 EDGE_RUNTIME_PORT=9998 -ONNXRUNTIME_VERSION=1.17.0 +ONNXRUNTIME_VERSION=1.19.2 FEATURES=cli/tracing RUST_BACKTRACE=full SCRIPT=$(readlink -f "$0") SCRIPTPATH=$(dirname "$SCRIPT") -cd $SCRIPTPATH && \ - docker build \ +cd $SCRIPTPATH && + docker build \ -t edge_runtime \ --build-arg GIT_V_TAG=$GIT_V_TAG \ --build-arg ONNXRUNTIME_VERSION=$ONNXRUNTIME_VERSION \ @@ -19,18 +19,18 @@ cd $SCRIPTPATH && \ "$SCRIPTPATH/.." docker run \ - --privileged \ - --rm \ - -it \ - -p $EDGE_RUNTIME_PORT:$EDGE_RUNTIME_PORT \ - -w /home/deno \ - -v "$SCRIPTPATH/../examples:/home/deno/examples" \ - -e EDGE_RUNTIME_PORT=$EDGE_RUNTIME_PORT \ - -e RUST_BACKTRACE=$RUST_BACKTRACE \ - -e RUST_LOG=$RUST_LOG \ - edge_runtime:latest \ - start \ - -p $EDGE_RUNTIME_PORT \ - --main-service ./examples/main \ - --event-worker ./examples/event-manager \ - --static "./examples/**/*.bin" + --privileged \ + --rm \ + -it \ + -p $EDGE_RUNTIME_PORT:$EDGE_RUNTIME_PORT \ + -w /home/deno \ + -v "$SCRIPTPATH/../examples:/home/deno/examples" \ + -e EDGE_RUNTIME_PORT=$EDGE_RUNTIME_PORT \ + -e RUST_BACKTRACE=$RUST_BACKTRACE \ + -e RUST_LOG=$RUST_LOG \ + edge_runtime:latest \ + start \ + -p $EDGE_RUNTIME_PORT \ + --main-service ./examples/main \ + --event-worker ./examples/event-manager \ + --static "./examples/**/*.bin" From 797709cd276f34c797792f9f339b142142958933 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 15 Jul 2024 03:38:14 +0000 Subject: [PATCH 02/27] stamp: expose the received unix signal number when exiting --- crates/base/src/commands.rs | 2 +- crates/base/src/server.rs | 6 ++++-- crates/cli/src/main.rs | 26 ++++++++++++++++++++------ 3 files changed, 25 insertions(+), 9 deletions(-) diff --git a/crates/base/src/commands.rs b/crates/base/src/commands.rs index a1ecad47c..11eaccca4 100644 --- a/crates/base/src/commands.rs +++ b/crates/base/src/commands.rs @@ -26,7 +26,7 @@ pub async fn start_server( inspector_option: Option, jsx_specifier: Option, jsx_module: Option, -) -> Result<(), Error> { +) -> Result, Error> { let mut server = Server::new( ip, port, diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 9a48c6675..d420575a8 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -445,7 +445,7 @@ impl Server { self.termination_tokens.terminate().await; } - pub async fn listen(&mut self) -> Result<(), Error> { + pub async fn listen(&mut self) -> Result, Error> { let addr = SocketAddr::new(IpAddr::V4(self.ip), self.port); let non_secure_listener = TcpListener::bind(&addr).await?; let mut secure_listener = if let Some(tls) = self.tls.take() { @@ -462,6 +462,7 @@ impl Server { let termination_tokens = &self.termination_tokens; let input_termination_token = termination_tokens.input.as_ref(); + let mut ret = None::; let mut can_receive_event = false; let mut interrupted = false; let (event_tx, event_rx) = mpsc::unbounded_channel(); @@ -569,6 +570,7 @@ impl Server { signum = &mut terminate_signal_fut => { info!("shutdown signal received: {}", signum); + ret = Some(signum); break; } @@ -667,7 +669,7 @@ impl Server { warn!("runtime exits immediately since the graceful exit feature has been disabled"); } - Ok(()) + Ok(ret) } } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index af23720bd..eefb96c5c 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -23,9 +23,10 @@ use std::fs::File; use std::io::Write; use std::net::SocketAddr; use std::path::PathBuf; +use std::process::ExitCode; use std::sync::Arc; -fn main() -> Result<(), anyhow::Error> { +fn main() -> Result { resolve_deno_runtime_env(); let runtime = tokio::runtime::Builder::new_current_thread() @@ -36,7 +37,7 @@ fn main() -> Result<(), anyhow::Error> { // TODO: Tokio runtime shouldn't be needed here (Address later) let local = tokio::task::LocalSet::new(); - let res: Result<(), Error> = local.block_on(&runtime, async { + let res: Result = local.block_on(&runtime, async { let matches = get_cli().get_matches(); let verbose = matches.get_flag("verbose"); @@ -66,7 +67,7 @@ fn main() -> Result<(), anyhow::Error> { #[allow(clippy::single_match)] #[allow(clippy::arc_with_non_send_sync)] - match matches.subcommand() { + let exit_code = match matches.subcommand() { Some(("start", sub_matches)) => { let ip = sub_matches.get_one::("ip").cloned().unwrap(); let port = sub_matches.get_one::("port").copied().unwrap(); @@ -192,7 +193,7 @@ fn main() -> Result<(), anyhow::Error> { request_read_timeout_ms: maybe_request_read_timeout, }; - start_server( + let maybe_received_signum = start_server( ip.as_str(), port, maybe_tls, @@ -237,7 +238,12 @@ fn main() -> Result<(), anyhow::Error> { jsx_module, ) .await?; + + maybe_received_signum + .map(|it| ExitCode::from(it as u8)) + .unwrap_or_default() } + Some(("bundle", sub_matches)) => { let output_path = sub_matches.get_one::("output").cloned().unwrap(); let import_map_path = sub_matches.get_one::("import-map").cloned(); @@ -326,7 +332,10 @@ fn main() -> Result<(), anyhow::Error> { let mut file = File::create(output_path.as_str())?; file.write_all(&bin)? } + + ExitCode::SUCCESS } + Some(("unbundle", sub_matches)) => { let output_path = sub_matches.get_one::("output").cloned().unwrap(); let eszip_path = sub_matches.get_one::("eszip").cloned().unwrap(); @@ -340,12 +349,17 @@ fn main() -> Result<(), anyhow::Error> { output_path.to_str().unwrap() ); } + + ExitCode::SUCCESS } + _ => { // unrecognized command + ExitCode::FAILURE } - } - Ok(()) + }; + + Ok(exit_code) }); res From da4883ae01b94c29018ab0dd692c2d35702dc756 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 12 Jul 2024 00:29:05 +0000 Subject: [PATCH 03/27] chore: update dependencies Signed-off-by: kallebysantos --- Cargo.lock | 35 ++++++++++++++++++++++++++++++++++- Cargo.toml | 7 +++++-- crates/base_rt/Cargo.toml | 5 ++++- crates/cli/Cargo.toml | 4 ++-- crates/sb_ai/Cargo.toml | 18 ++++++++++++++++++ 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22ad6cefe..34a6b0a75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -498,10 +498,12 @@ dependencies = [ name = "base_rt" version = "0.1.0" dependencies = [ + "cpu_timer", "deno_core", "once_cell", "tokio", "tokio-util", + "tracing", ] [[package]] @@ -912,6 +914,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cooked-waker" version = "5.0.0" @@ -1822,7 +1833,7 @@ version = "0.99.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version 0.4.0", @@ -2403,6 +2414,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -4968,17 +4988,30 @@ name = "sb_ai" version = "0.1.0" dependencies = [ "anyhow", + "base_rt", + "clap", + "convert_case 0.6.0", + "ctor", "deno_core", + "faster-hex", + "futures", + "futures-util", + "fxhash", "log", "ndarray", "ndarray-linalg", "once_cell", "ort", "rand", + "reqwest 0.12.4", + "scopeguard", "serde", "tokenizers", "tokio", + "tokio-util", "tracing", + "tracing-subscriber", + "xxhash-rust", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a44ab0b44..4a71c2605 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ ctor = "0.2.6" fastwebsockets = { version = "0.6", features = ["upgrade", "unstable-split"] } percent-encoding = "2.3.0" scopeguard = "1.2.0" +clap = { version = "4.0.29", features = ["cargo", "string", "env"] } glob = "0.3.1" httparse = "1.8.0" http = "1.0" @@ -110,7 +111,6 @@ faster-hex = "0.9" num-bigint = { version = "0.4", features = ["rand"] } notify = "=5.0.0" parking_lot = "0.12.0" -pin-project = "1.0.11" rustls = "0.22.4" rustls-pemfile = "2" rustls-tokio-stream = "=0.2.23" @@ -130,10 +130,13 @@ rand = "=0.8.5" signature = "2.1" spki = "0.7.2" urlencoding = "2.1.2" +pin-project = "1" +fxhash = "0.2" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"] } rkyv = "0.7" tempfile = "3" +xxhash-rust = "0.8" [patch.crates-io] # If the PR is merged upstream, remove the line below. diff --git a/crates/base_rt/Cargo.toml b/crates/base_rt/Cargo.toml index 156c34655..11f4720d5 100644 --- a/crates/base_rt/Cargo.toml +++ b/crates/base_rt/Cargo.toml @@ -6,6 +6,9 @@ edition = "2021" [dependencies] deno_core.workspace = true +cpu_timer = { version = "0.1.0", path = "../cpu_timer" } tokio.workspace = true once_cell.workspace = true -tokio-util = { workspace = true, features = ["rt"] } \ No newline at end of file +tracing.workspace = true +tokio-util = { workspace = true, features = ["rt"] } + diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 38d0d2ffa..becdec027 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -21,9 +21,9 @@ log.workspace = true tokio.workspace = true glob.workspace = true once_cell.workspace = true -tracing-subscriber = { workspace = true, optional = true, features = ["env-filter", "tracing-log"] } +clap.workspace = true +tracing-subscriber = { workspace = true, optional = true } -clap = { version = "4.0.29", features = ["cargo", "string", "env", "derive"] } env_logger = "0.10.0" [features] diff --git a/crates/sb_ai/Cargo.toml b/crates/sb_ai/Cargo.toml index d9fdb0de5..e29c4cd47 100644 --- a/crates/sb_ai/Cargo.toml +++ b/crates/sb_ai/Cargo.toml @@ -9,19 +9,37 @@ license = "MIT" [lib] path = "lib.rs" +[[bin]] +name = "preload-sb-ai-defs" +path = "preload_defs.rs" + [dependencies] deno_core.workspace = true +base_rt = { version = "0.1.0", path = "../base_rt" } + anyhow.workspace = true log.workspace = true serde.workspace = true +futures.workspace = true +futures-util = { workspace = true, features = ["io"] } +fxhash.workspace = true +faster-hex.workspace = true +xxhash-rust = { workspace = true, features = ["std", "xxh3"] } +reqwest.workspace = true tokio.workspace = true +tokio-util = { workspace = true, features = ["compat"] } +clap = { workspace = true, features = ["derive"] } once_cell.workspace = true +ctor.workspace = true +scopeguard.workspace = true tracing.workspace = true +tracing-subscriber.workspace = true ndarray = "0.15" ndarray-linalg = "0.15" rand = "0.8" +convert_case = "0.6" tokenizers = { version = ">=0.13.4", default-features = false, features = [ "onig" ] } ort = { git = "https://github.com/pykeio/ort", default-features = false, features = [ "ndarray", "half", "load-dynamic" ] } From d67900aa5948f149d02375d2672677fb44695333 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 5 Jul 2024 05:15:28 +0000 Subject: [PATCH 04/27] stamp: init onnx runtime using `ctor` Signed-off-by: kallebysantos --- crates/sb_ai/Cargo.toml | 14 ++++++++------ crates/sb_ai/lib.rs | 15 +++------------ crates/sb_ai/onnx.rs | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 18 deletions(-) create mode 100644 crates/sb_ai/onnx.rs diff --git a/crates/sb_ai/Cargo.toml b/crates/sb_ai/Cargo.toml index e29c4cd47..3848c3f29 100644 --- a/crates/sb_ai/Cargo.toml +++ b/crates/sb_ai/Cargo.toml @@ -9,10 +9,6 @@ license = "MIT" [lib] path = "lib.rs" -[[bin]] -name = "preload-sb-ai-defs" -path = "preload_defs.rs" - [dependencies] deno_core.workspace = true @@ -40,6 +36,12 @@ ndarray = "0.15" ndarray-linalg = "0.15" rand = "0.8" convert_case = "0.6" -tokenizers = { version = ">=0.13.4", default-features = false, features = [ "onig" ] } +tokenizers = { version = ">=0.13.4", default-features = false, features = [ + "onig", +] } -ort = { git = "https://github.com/pykeio/ort", default-features = false, features = [ "ndarray", "half", "load-dynamic" ] } +ort = { git = "https://github.com/pykeio/ort", default-features = false, features = [ + "ndarray", + "half", + "load-dynamic", +] } diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index bf56e6000..48143592f 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -1,3 +1,4 @@ +mod onnx; use anyhow::anyhow; use anyhow::{bail, Error}; use deno_core::error::AnyError; @@ -6,7 +7,7 @@ use deno_core::{op2, V8CrossThreadTaskSpawner, V8TaskSpawner}; use log::error; use ndarray::{Array1, Array2, ArrayView3, Axis, Ix3}; use ndarray_linalg::norm::{normalize, NormalizeAxis}; -use once_cell::sync::Lazy; +use onnx::ensure_onnx_env_init; use ort::{inputs, GraphOptimizationLevel, Session}; use std::cell::RefCell; use std::path::{Path, PathBuf}; @@ -54,17 +55,7 @@ fn mean_pool(last_hidden_states: ArrayView3, attention_mask: ArrayView3 Result<(), Error> { - static ONNX_ENV_INIT: Lazy> = Lazy::new(|| { - // Create the ONNX Runtime environment, for all sessions created in this process. - if let Err(err) = ort::init().with_name("GTE").commit() { - error!("sb_ai: failed to create environment - {}", err); - return Some(err); - } - - None - }); - - if let Some(err) = &*ONNX_ENV_INIT { + if let Some(err) = ensure_onnx_env_init() { return Err(anyhow!("failed to create onnx environment: {err}")); } diff --git a/crates/sb_ai/onnx.rs b/crates/sb_ai/onnx.rs new file mode 100644 index 000000000..5a2809ecd --- /dev/null +++ b/crates/sb_ai/onnx.rs @@ -0,0 +1,33 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; + +use ctor::ctor; +use log::error; +use scopeguard::guard; +use tracing::instrument; + +static ONNX_INIT_ONNX_ENV_DONE: AtomicBool = AtomicBool::new(false); +static ONNX_INIT_RESULT: Mutex>> = Mutex::new(None); + +#[ctor] +#[instrument] +fn init_onnx_env() { + let mut guard1 = ONNX_INIT_RESULT.lock().unwrap(); + let mut _guard2 = guard(&ONNX_INIT_ONNX_ENV_DONE, |v| { + let _ = v.swap(true, Ordering::SeqCst); + }); + + // Create the ONNX Runtime environment, for all sessions created in this process. + // TODO: Add CUDA execution provider + if let Err(err) = ort::init().with_name("SB_AI_ONNX").commit() { + error!("sb_ai: failed to create environment: {}", err); + let _ = guard1.insert(Arc::new(err)); + } +} + +pub(crate) fn ensure_onnx_env_init() -> Option> { + assert!(ONNX_INIT_ONNX_ENV_DONE.load(Ordering::SeqCst)); + ONNX_INIT_RESULT.lock().unwrap().clone() +} From 5361bc4d1912bc3961e3baa161317810d5601e77 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 12 Jul 2024 01:42:44 +0000 Subject: [PATCH 05/27] stamp: don't propagate panic caused by library loading failure --- crates/sb_ai/onnx.rs | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/crates/sb_ai/onnx.rs b/crates/sb_ai/onnx.rs index 5a2809ecd..ea77ce5a7 100644 --- a/crates/sb_ai/onnx.rs +++ b/crates/sb_ai/onnx.rs @@ -19,11 +19,40 @@ fn init_onnx_env() { let _ = v.swap(true, Ordering::SeqCst); }); - // Create the ONNX Runtime environment, for all sessions created in this process. - // TODO: Add CUDA execution provider - if let Err(err) = ort::init().with_name("SB_AI_ONNX").commit() { - error!("sb_ai: failed to create environment: {}", err); - let _ = guard1.insert(Arc::new(err)); + let _guard3 = guard(std::panic::take_hook(), |v| { + std::panic::set_hook(v); + }); + + std::panic::set_hook(Box::new(|_| { + // no op + })); + + let result = std::panic::catch_unwind(|| { + // Create the ONNX Runtime environment, for all sessions created in this process. + // TODO: Add CUDA execution provider + ort::init().with_name("SB_AI_ONNX").commit() + }); + + match result { + Err(err) => { + // the most common reason that reaches to this arm is a library loading failure. + let _ = guard1.insert(Arc::new(ort::Error::CustomError(Box::from(match err + .downcast_ref::<&'static str>() + { + Some(s) => s, + None => match err.downcast_ref::() { + Some(s) => &s[..], + None => "unknown error", + }, + })))); + } + + Ok(Err(err)) => { + error!("sb_ai: failed to create environment: {}", err); + let _ = guard1.insert(Arc::new(err)); + } + + _ => {} } } From b1451e03b99f3ae24bcc59261c4c0d8e847887fc Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 17 Oct 2024 14:57:01 +0100 Subject: [PATCH 06/27] fix(sb_ai): reflect upstream api changes Signed-off-by: kallebysantos --- Cargo.lock | 14 ++++++++------ crates/sb_ai/lib.rs | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34a6b0a75..81cc2d19a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1120,7 +1120,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b28bfe653d79bd16c77f659305b195b82bb5ce0c0eb2a4846b82ddbd77586813" dependencies = [ "bitflags 2.5.0", - "libloading 0.8.1", + "libloading 0.7.4", "winapi", ] @@ -4014,21 +4014,23 @@ dependencies = [ [[package]] name = "ort" -version = "2.0.0-rc.0" -source = "git+https://github.com/pykeio/ort#2abc210f3958c5b94528cc71c6c8781813a2cc2f" +version = "2.0.0-rc.2" +source = "git+https://github.com/pykeio/ort#467d127c5877b099e1d0f605d38b74d221b6121c" dependencies = [ "half", + "js-sys", "libloading 0.8.1", "ndarray", "ort-sys", "thiserror", "tracing", + "web-sys", ] [[package]] name = "ort-sys" -version = "2.0.0-rc.0" -source = "git+https://github.com/pykeio/ort#2abc210f3958c5b94528cc71c6c8781813a2cc2f" +version = "2.0.0-rc.2" +source = "git+https://github.com/pykeio/ort#467d127c5877b099e1d0f605d38b74d221b6121c" [[package]] name = "os_pipe" @@ -7196,7 +7198,7 @@ dependencies = [ "js-sys", "khronos-egl", "libc", - "libloading 0.8.1", + "libloading 0.7.4", "log", "metal", "naga", diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index 48143592f..bf29feaa3 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -139,7 +139,7 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { }?) })?; - let embeddings = outputs["last_hidden_state"].extract_tensor()?; + let embeddings = outputs["last_hidden_state"].try_extract_tensor()?; let embeddings = embeddings.into_dimensionality::()?; let result = if do_mean_pooling { From 2e7ab5803d50823f65e66e95d3a3f0a8a8e7470c Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 30 Jul 2024 05:49:09 +0000 Subject: [PATCH 07/27] chore: update `scripts/run_dind.sh` Signed-off-by: kallebysantos --- scripts/run_dind.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/run_dind.sh b/scripts/run_dind.sh index df1aedb85..a7dc93fc9 100755 --- a/scripts/run_dind.sh +++ b/scripts/run_dind.sh @@ -6,15 +6,17 @@ ONNXRUNTIME_VERSION=1.19.2 FEATURES=cli/tracing RUST_BACKTRACE=full +PWD=$(pwd) +PROFILE=${1:-dind} SCRIPT=$(readlink -f "$0") SCRIPTPATH=$(dirname "$SCRIPT") -cd $SCRIPTPATH && +cd "$SCRIPTPATH" && docker build \ -t edge_runtime \ --build-arg GIT_V_TAG=$GIT_V_TAG \ --build-arg ONNXRUNTIME_VERSION=$ONNXRUNTIME_VERSION \ - --build-arg PROFILE=dind \ + --build-arg PROFILE=$PROFILE \ --build-arg FEATURES=$FEATURES \ "$SCRIPTPATH/.." @@ -34,3 +36,5 @@ docker run \ --main-service ./examples/main \ --event-worker ./examples/event-manager \ --static "./examples/**/*.bin" + +cd $PWD From 717e6cf2c6a96efd77866322bc1301a0e500490d Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:50:10 +0000 Subject: [PATCH 08/27] chore(k6): bump k6 to 0.52.0 --- k6/scripts/setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k6/scripts/setup.sh b/k6/scripts/setup.sh index 6765965a1..1e4e39b17 100755 --- a/k6/scripts/setup.sh +++ b/k6/scripts/setup.sh @@ -1,6 +1,6 @@ #!/bin/bash -K6_VERSION="v0.49.0" +K6_VERSION="v0.52.0" OS="$(uname -s)" if ! command -v npm &> /dev/null; then From 79af205cd10671ebee18f0f5cd31700f6f9d2415 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:51:09 +0000 Subject: [PATCH 09/27] chore(k6): update `setup.sh` --- k6/scripts/setup.sh | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/k6/scripts/setup.sh b/k6/scripts/setup.sh index 1e4e39b17..7e885ea52 100755 --- a/k6/scripts/setup.sh +++ b/k6/scripts/setup.sh @@ -1,14 +1,19 @@ #!/bin/bash +PWD=$(pwd) K6_VERSION="v0.52.0" OS="$(uname -s)" +SCRIPT=$(readlink -f "$0") +SCRIPTPATH=$(dirname "$SCRIPT") + +cd $SCRIPTPATH if ! command -v npm &> /dev/null; then echo "npm is not installed" exit 1 fi -if ! command -v k6 &> /dev/null +if ! command -v k6 &> /dev/null || [[ "$1" = "-f" ]]; then case "${OS}" in Linux*) @@ -61,3 +66,5 @@ export K6_TARGET="http://127.0.0.1:9998" export K6_RUN_PROFILE="performance" npm install + +cd $PWD \ No newline at end of file From 6bdbde359541c913b9e1b56a65030241b3260216 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:51:25 +0000 Subject: [PATCH 10/27] chore(k6): update `tsconfig.json` --- k6/tsconfig.json | 1 + 1 file changed, 1 insertion(+) diff --git a/k6/tsconfig.json b/k6/tsconfig.json index 80366662e..e7d63a03b 100644 --- a/k6/tsconfig.json +++ b/k6/tsconfig.json @@ -22,6 +22,7 @@ "allowSyntheticDefaultImports": true, "experimentalDecorators": true, "emitDecoratorMetadata": true, + "downlevelIteration": true, "allowJs": true, "strict": true, From 085a510c2843897ce442dedf997efef285731524 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:51:38 +0000 Subject: [PATCH 11/27] chore(k6): add dependency --- k6/package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/k6/package.json b/k6/package.json index d33e37d44..d931111c0 100644 --- a/k6/package.json +++ b/k6/package.json @@ -19,5 +19,8 @@ "rollup-plugin-copy": "^3.5.0", "typescript": "^5.4.2", "vite": "^5.1.7" + }, + "dependencies": { + "@faker-js/faker": "^8.4.1" } } From 909a59f2cf852e791fb7729e36b037858a04793e Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:51:48 +0000 Subject: [PATCH 12/27] chore(k6): update `package-lock.json` --- k6/package-lock.json | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/k6/package-lock.json b/k6/package-lock.json index aba9bd96a..1f0de1faa 100644 --- a/k6/package-lock.json +++ b/k6/package-lock.json @@ -9,6 +9,9 @@ "version": "1.0.0", "hasInstallScript": true, "license": "ISC", + "dependencies": { + "@faker-js/faker": "^8.4.1" + }, "devDependencies": { "@babel/core": "^7.24.0", "@rollup/plugin-babel": "^6.0.4", @@ -689,6 +692,21 @@ "node": ">=12" } }, + "node_modules/@faker-js/faker": { + "version": "8.4.1", + "resolved": "https://registry.npmjs.org/@faker-js/faker/-/faker-8.4.1.tgz", + "integrity": "sha512-XQ3cU+Q8Uqmrbf2e0cIC/QN43sTBSC8KF12u29Mb47tWrt2hAgBXSgpZMj4Ao8Uk0iJcU99QsOCaIL8934obCg==", + "funding": [ + { + "type": "opencollective", + "url": "https://opencollective.com/fakerjs" + } + ], + "engines": { + "node": "^14.17.0 || ^16.13.0 || >=18.0.0", + "npm": ">=6.14.13" + } + }, "node_modules/@jridgewell/gen-mapping": { "version": "0.3.5", "resolved": "https://registry.npmjs.org/@jridgewell/gen-mapping/-/gen-mapping-0.3.5.tgz", From 2bb4fb5d17882fcc5c8e1c4416fab15c2e5509da Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 00:52:31 +0000 Subject: [PATCH 13/27] fix(k6): make gte scenario more robustly --- k6/generators.ts | 5 +++++ k6/specs/gte.ts | 19 +++++++++++++++++-- k6/specs/may-upload-failure.ts | 2 +- 3 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 k6/generators.ts diff --git a/k6/generators.ts b/k6/generators.ts new file mode 100644 index 000000000..a40c9ad33 --- /dev/null +++ b/k6/generators.ts @@ -0,0 +1,5 @@ +import { faker } from "@faker-js/faker"; + +export function makeText(len: number): string[] { + return [...Array(len).keys()].map(_ => faker.lorem.text()); +} \ No newline at end of file diff --git a/k6/specs/gte.ts b/k6/specs/gte.ts index 22288e766..ca1321fa0 100644 --- a/k6/specs/gte.ts +++ b/k6/specs/gte.ts @@ -18,6 +18,9 @@ import { Options } from "k6/options"; import { target } from "../config"; +/** @ts-ignore */ +import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.2.0/index.js"; + export const options: Options = { scenarios: { simple: { @@ -28,11 +31,23 @@ export const options: Options = { } }; -export default function gte() { +const GENERATORS = import("../generators"); + +export async function setup() { + const pkg = await GENERATORS; + return { + words: pkg.makeText(1000) + } +} + +export default function gte(data: { words: string[] }) { + const wordIdx = randomIntBetween(0, data.words.length - 1); + + console.debug(`WORD[${wordIdx}]: ${data.words[wordIdx]}`); const res = http.post( `${target}/k6-gte`, JSON.stringify({ - "text_for_embedding": "meow" + "text_for_embedding": data.words[wordIdx] }) ); diff --git a/k6/specs/may-upload-failure.ts b/k6/specs/may-upload-failure.ts index a8bed5c5e..b76e04808 100644 --- a/k6/specs/may-upload-failure.ts +++ b/k6/specs/may-upload-failure.ts @@ -6,7 +6,7 @@ import { Options } from "k6/options"; import { target } from "../config"; /** @ts-ignore */ -import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.2.0/index.js" +import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.2.0/index.js"; const MB = 1048576; const MSG_CANCELED = "WorkerRequestCancelled: request has been cancelled by supervisor"; From d74379178bd0564f124c190eec38ecfb5ee8e799 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 9 Aug 2024 03:26:35 +0000 Subject: [PATCH 14/27] fix(k6): add a test for request cancelled to gte scenario --- k6/constants.ts | 1 + k6/specs/gte.ts | 17 +++++++++++++++-- k6/specs/may-upload-failure.ts | 2 +- 3 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 k6/constants.ts diff --git a/k6/constants.ts b/k6/constants.ts new file mode 100644 index 000000000..1e104527c --- /dev/null +++ b/k6/constants.ts @@ -0,0 +1 @@ +export const MSG_CANCELED = "WorkerRequestCancelled: request has been cancelled by supervisor"; \ No newline at end of file diff --git a/k6/specs/gte.ts b/k6/specs/gte.ts index ca1321fa0..e472d791a 100644 --- a/k6/specs/gte.ts +++ b/k6/specs/gte.ts @@ -13,13 +13,14 @@ EDGE_RUNTIME_PORT=9998 RUST_BACKTRACE=full ./target/debug/edge-runtime "$@" star import http from "k6/http"; -import { check } from "k6"; +import { check, fail } from "k6"; import { Options } from "k6/options"; import { target } from "../config"; /** @ts-ignore */ import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.2.0/index.js"; +import { MSG_CANCELED } from "../constants"; export const options: Options = { scenarios: { @@ -51,7 +52,19 @@ export default function gte(data: { words: string[] }) { }) ); - check(res, { + const isOk = check(res, { "status is 200": r => r.status === 200 }); + + const isRequestCancelled = check(res, { + "request cancelled": r => { + const msg = r.json("msg"); + return r.status === 500 && msg === MSG_CANCELED; + } + }); + + if (!isOk && !isRequestCancelled) { + console.log(res.body); + fail("unexpected response"); + } } \ No newline at end of file diff --git a/k6/specs/may-upload-failure.ts b/k6/specs/may-upload-failure.ts index b76e04808..a4a8e12e5 100644 --- a/k6/specs/may-upload-failure.ts +++ b/k6/specs/may-upload-failure.ts @@ -4,12 +4,12 @@ import { check } from "k6"; import { Options } from "k6/options"; import { target } from "../config"; +import { MSG_CANCELED } from "../constants"; /** @ts-ignore */ import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.2.0/index.js"; const MB = 1048576; -const MSG_CANCELED = "WorkerRequestCancelled: request has been cancelled by supervisor"; const dummyBinary = new Uint8Array(randomIntBetween(25 * MB, 35 * MB)); const dummyFile = http.file(dummyBinary, "dummy", "application/octet-stream"); From 893ebb00688f4aa132d8468d76fa3d0862a7c636 Mon Sep 17 00:00:00 2001 From: kallebysantos Date: Thu, 17 Oct 2024 16:12:56 +0100 Subject: [PATCH 15/27] stamp: optimize pipeline loading - Using `HashMap` allows the reusing of sessions between requests. Signed-off-by: kallebysantos --- crates/sb_ai/Cargo.toml | 1 + crates/sb_ai/lib.rs | 25 ++++-------- crates/sb_ai/session.rs | 85 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 17 deletions(-) create mode 100644 crates/sb_ai/session.rs diff --git a/crates/sb_ai/Cargo.toml b/crates/sb_ai/Cargo.toml index 3848c3f29..fc146050a 100644 --- a/crates/sb_ai/Cargo.toml +++ b/crates/sb_ai/Cargo.toml @@ -44,4 +44,5 @@ ort = { git = "https://github.com/pykeio/ort", default-features = false, feature "ndarray", "half", "load-dynamic", + "cuda", ] } diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index bf29feaa3..d350402f2 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -1,4 +1,6 @@ mod onnx; +mod session; + use anyhow::anyhow; use anyhow::{bail, Error}; use deno_core::error::AnyError; @@ -8,9 +10,10 @@ use log::error; use ndarray::{Array1, Array2, ArrayView3, Axis, Ix3}; use ndarray_linalg::norm::{normalize, NormalizeAxis}; use onnx::ensure_onnx_env_init; -use ort::{inputs, GraphOptimizationLevel, Session}; +use ort::inputs; +use session::load_session_from_file; use std::cell::RefCell; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::rc::Rc; use std::sync::Arc; use tokenizers::Tokenizer; @@ -37,15 +40,6 @@ struct GteModelRequest { result_tx: mpsc::UnboundedSender, Error>>, } -fn create_session(model_file_path: PathBuf) -> Result { - let session = Session::builder()? - .with_optimization_level(GraphOptimizationLevel::Level3)? - .with_intra_threads(1)? - .commit_from_file(model_file_path)?; - - Ok(session) -} - fn mean_pool(last_hidden_states: ArrayView3, attention_mask: ArrayView3) -> Array2 { let masked_hidden_states = last_hidden_states.into_owned() * &attention_mask.mapv(|x| x as f32); let sum_hidden_states = masked_hidden_states.sum_axis(Axis(1)); @@ -55,10 +49,6 @@ fn mean_pool(last_hidden_states: ArrayView3, attention_mask: ArrayView3 Result<(), Error> { - if let Some(err) = ensure_onnx_env_init() { - return Err(anyhow!("failed to create onnx environment: {err}")); - } - let spawner = state.borrow::().clone(); let cross_thread_spawner = state.borrow::().clone(); @@ -69,7 +59,8 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { state.put::>(req_tx); spawner.spawn(move |_| { - let session = create_session(Path::new(&models_dir).join("gte-small").join("model.onnx")); + let session = + load_session_from_file(Path::new(&models_dir).join("gte-small").join("model.onnx")); if session.is_err() { let err = session.as_ref().unwrap_err(); @@ -77,7 +68,7 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { return; } - let session = session.unwrap(); + let (_, session) = session.unwrap(); let tokenizer = Tokenizer::from_file( Path::new(&models_dir) .join("gte-small") diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs new file mode 100644 index 000000000..ec6f01292 --- /dev/null +++ b/crates/sb_ai/session.rs @@ -0,0 +1,85 @@ +use deno_core::error::AnyError; +use once_cell::sync::Lazy; +use std::collections::HashMap; +use std::sync::Mutex; +use std::{path::PathBuf, sync::Arc}; + +use anyhow::{anyhow, Error}; +use ort::{ + CPUExecutionProvider, ExecutionProviderDispatch, GraphOptimizationLevel, Session, + SessionBuilder, +}; + +use crate::onnx::ensure_onnx_env_init; + +static SESSIONS: Lazy>>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +pub(crate) fn get_session_builder() -> Result { + let orm_threads = std::env::var("OMP_NUM_THREADS") + .map_or(None, |val| val.parse::().ok()) + .unwrap_or(1); + + let builder = Session::builder()? + .with_optimization_level(GraphOptimizationLevel::Level3)? + // NOTE(Nyannyacha): This is set to prevent memory leaks caused by different input + // shapes. + // + // Backgrounds: + // [1]: https://github.com/microsoft/onnxruntime/issues/11118 + // [2]: https://github.com/microsoft/onnxruntime/blob/main/onnxruntime/core/framework/session_options.h#L95-L110 + .with_memory_pattern(false)? + .with_intra_threads(orm_threads)?; + + Ok(builder) +} + +fn cpu_execution_provider() -> Box> { + Box::new( + [ + // NOTE(Nyannacha): See the comment above. This makes `enable_cpu_mem_arena` set to + // False. + // + // Backgrounds: + // [1]: https://docs.rs/ort/2.0.0-rc.4/src/ort/execution_providers/cpu.rs.html#9-18 + // [2]: https://docs.rs/ort/2.0.0-rc.4/src/ort/execution_providers/cpu.rs.html#46-50 + CPUExecutionProvider::default().build(), + ] + .into_iter(), + ) +} + +fn create_session(model_bytes: &[u8]) -> Result, Error> { + let session = { + if let Some(err) = ensure_onnx_env_init() { + return Err(anyhow!("failed to create onnx environment: {err}")); + } + + get_session_builder()? + .with_execution_providers(cpu_execution_provider())? + .commit_from_memory(model_bytes)? + }; + + let session = Arc::new(session); + + Ok(session) +} + +pub(crate) fn load_session_from_file( + model_file_path: PathBuf, +) -> Result<(String, Arc), Error> { + let session_id = fxhash::hash(&model_file_path.to_string_lossy()).to_string(); + + let mut sessions = SESSIONS.lock().unwrap(); + + if let Some(session) = sessions.get(&session_id) { + return Ok((session_id, session.clone())); + } + let model_bytes = std::fs::read(model_file_path)?; + + let session = create_session(model_bytes.as_slice())?; + + sessions.insert(session_id.to_owned(), session.clone()); + + Ok((session_id, session)) +} From b0c8f6cab80e08567812a107e6f11ac1a127c810 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 27 Oct 2024 14:25:20 +0000 Subject: [PATCH 16/27] stamp: introduce session cleanup logic Signed-off-by: kallebysantos --- crates/sb_ai/js/ai.js | 13 ++++++++++++- crates/sb_ai/lib.rs | 16 ++++++++++++---- crates/sb_ai/session.rs | 30 +++++++++++++++++++++++++++++- crates/sb_core/js/bootstrap.js | 2 +- crates/sb_core/js/main_worker.js | 2 ++ 5 files changed, 56 insertions(+), 7 deletions(-) diff --git a/crates/sb_ai/js/ai.js b/crates/sb_ai/js/ai.js index 35b933f7a..9780a400a 100644 --- a/crates/sb_ai/js/ai.js +++ b/crates/sb_ai/js/ai.js @@ -239,4 +239,15 @@ class Session { } } -export default { Session }; +const MAIN_WORKER_API = { + tryCleanupUnusedSession: () => core.ops.op_sb_ai_try_cleanup_unused_session(), +}; + +const USER_WORKER_API = { + Session, +}; + +export { + MAIN_WORKER_API, + USER_WORKER_API, +}; diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index d350402f2..c79f26424 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -1,15 +1,13 @@ mod onnx; mod session; -use anyhow::anyhow; use anyhow::{bail, Error}; use deno_core::error::AnyError; -use deno_core::OpState; use deno_core::{op2, V8CrossThreadTaskSpawner, V8TaskSpawner}; +use deno_core::{serde_json, OpState}; use log::error; use ndarray::{Array1, Array2, ArrayView3, Axis, Ix3}; use ndarray_linalg::norm::{normalize, NormalizeAxis}; -use onnx::ensure_onnx_env_init; use ort::inputs; use session::load_session_from_file; use std::cell::RefCell; @@ -24,7 +22,11 @@ use tracing::trace_span; deno_core::extension!( sb_ai, - ops = [op_sb_ai_run_model, op_sb_ai_init_model], + ops = [ + op_sb_ai_run_model, + op_sb_ai_init_model, + op_sb_ai_try_cleanup_unused_session + ], esm_entry_point = "ext:sb_ai/js/ai.js", esm = [ "js/ai.js", @@ -228,3 +230,9 @@ pub async fn op_sb_ai_run_model( bail!("model not supported") } } + +#[op2(fast)] +#[bigint] +pub fn op_sb_ai_try_cleanup_unused_session() -> Result { + session::cleanup() +} diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs index ec6f01292..42b59b5db 100644 --- a/crates/sb_ai/session.rs +++ b/crates/sb_ai/session.rs @@ -1,10 +1,13 @@ use deno_core::error::AnyError; +use deno_core::serde_json; use once_cell::sync::Lazy; +use serde::Serialize; use std::collections::HashMap; use std::sync::Mutex; +use std::usize; use std::{path::PathBuf, sync::Arc}; -use anyhow::{anyhow, Error}; +use anyhow::{anyhow, Context, Error}; use ort::{ CPUExecutionProvider, ExecutionProviderDispatch, GraphOptimizationLevel, Session, SessionBuilder, @@ -83,3 +86,28 @@ pub(crate) fn load_session_from_file( Ok((session_id, session)) } + +pub fn cleanup() -> Result { + let mut remove_counter = 0; + { + let mut guard = SESSIONS.lock().unwrap(); + let mut to_be_removed = vec![]; + + for (key, session) in &mut *guard { + if Arc::strong_count(session) > 1 { + continue; + } + + to_be_removed.push(key.clone()); + } + + for key in to_be_removed { + let old_store = guard.remove(&key); + debug_assert!(old_store.is_some()); + + remove_counter += 1; + } + } + + Ok(remove_counter) +} diff --git a/crates/sb_core/js/bootstrap.js b/crates/sb_core/js/bootstrap.js index 5e3ed0282..baa13409e 100644 --- a/crates/sb_core/js/bootstrap.js +++ b/crates/sb_core/js/bootstrap.js @@ -21,7 +21,7 @@ import * as response from 'ext:deno_fetch/23_response.js'; import * as request from 'ext:deno_fetch/23_request.js'; import * as globalInterfaces from 'ext:deno_web/04_global_interfaces.js'; import { SUPABASE_ENV } from 'ext:sb_env/env.js'; -import ai from 'ext:sb_ai/js/ai.js'; +import { USER_WORKER_API as ai } from 'ext:sb_ai/js/ai.js'; import { registerErrors } from 'ext:sb_core_main_js/js/errors.js'; import { formatException, diff --git a/crates/sb_core/js/main_worker.js b/crates/sb_core/js/main_worker.js index eb0f210b4..68b566426 100644 --- a/crates/sb_core/js/main_worker.js +++ b/crates/sb_core/js/main_worker.js @@ -1,3 +1,4 @@ +import { MAIN_WORKER_API as ai } from 'ext:sb_ai/js/ai.js'; import { SUPABASE_USER_WORKERS } from 'ext:sb_user_workers/user_workers.js'; import { applySupabaseTag } from 'ext:sb_core_main_js/js/http.js'; import { core } from 'ext:core/mod.js'; @@ -7,6 +8,7 @@ const ops = core.ops; Object.defineProperty(globalThis, 'EdgeRuntime', { get() { return { + ai, userWorkers: SUPABASE_USER_WORKERS, getRuntimeMetrics: () => /* async */ ops.op_runtime_metrics(), applySupabaseTag: (src, dest) => applySupabaseTag(src, dest), From 185ffba11bd65aafd2c6308f771ad67816fc4286 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 1 Aug 2024 02:26:57 +0000 Subject: [PATCH 17/27] chore: update `examples/main.ts` Signed-off-by: kallebysantos --- examples/main/index.ts | 355 +++++++++++++++++++++-------------------- 1 file changed, 186 insertions(+), 169 deletions(-) diff --git a/examples/main/index.ts b/examples/main/index.ts index 5bc361c11..ee1c4023e 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -8,174 +8,191 @@ console.log('main function started'); // log system memory usage every 30s // setInterval(() => console.log(EdgeRuntime.systemMemoryInfo()), 30 * 1000); +// cleanup unused sessions every 30s +// setInterval(async () => { +// const { activeUserWorkersCount } = await EdgeRuntime.getRuntimeMetrics(); +// if (activeUserWorkersCount > 0) { +// return; +// } +// try { +// const cleanupCount = await EdgeRuntime.ai.tryCleanupUnusedSession(); +// if (cleanupCount == 0) { +// return; +// } +// console.log('EdgeRuntime.ai.tryCleanupUnusedSession', cleanupCount); +// } catch (e) { +// console.error(e.toString()); +// } +// }, 30 * 1000); + Deno.serve(async (req: Request) => { - const headers = new Headers({ - 'Content-Type': 'application/json', - }); - - const url = new URL(req.url); - const { pathname } = url; - - // handle health checks - if (pathname === '/_internal/health') { - return new Response( - JSON.stringify({ 'message': 'ok' }), - { - status: STATUS_CODE.OK, - headers, - }, - ); - } - - // if (pathname === '/_internal/segfault') { - // EdgeRuntime.raiseSegfault(); - // return new Response( - // JSON.stringify({ 'message': 'ok' }), - // { - // status: STATUS_CODE.OK, - // headers, - // }, - // ); - // } - - if (pathname === '/_internal/metric') { - const metric = await EdgeRuntime.getRuntimeMetrics(); - return Response.json(metric); - } - - // NOTE: You can test WebSocket in the main worker by uncommenting below. - // if (pathname === '/_internal/ws') { - // const upgrade = req.headers.get("upgrade") || ""; - - // if (upgrade.toLowerCase() != "websocket") { - // return new Response("request isn't trying to upgrade to websocket."); - // } - - // const { socket, response } = Deno.upgradeWebSocket(req); - - // socket.onopen = () => console.log("socket opened"); - // socket.onmessage = (e) => { - // console.log("socket message:", e.data); - // socket.send(new Date().toString()); - // }; - - // socket.onerror = e => console.log("socket errored:", e.message); - // socket.onclose = () => console.log("socket closed"); - - // return response; // 101 (Switching Protocols) - // } - - const REGISTRY_PREFIX = '/_internal/registry'; - if (pathname.startsWith(REGISTRY_PREFIX)) { - return await handleRegistryRequest(REGISTRY_PREFIX, req); - } - - const path_parts = pathname.split('/'); - const service_name = path_parts[1]; - - if (!service_name || service_name === '') { - const error = { msg: 'missing function name in request' }; - return new Response( - JSON.stringify(error), - { status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, - ); - } - - const servicePath = `./examples/${service_name}`; - // console.error(`serving the request with ${servicePath}`); - - const createWorker = async () => { - const memoryLimitMb = 150; - const workerTimeoutMs = 5 * 60 * 1000; - const noModuleCache = false; - - // you can provide an import map inline - // const inlineImportMap = { - // imports: { - // "std/": "https://deno.land/std@0.131.0/", - // "cors": "./examples/_shared/cors.ts" - // } - // } - - // const importMapPath = `data:${encodeURIComponent(JSON.stringify(importMap))}?${encodeURIComponent('/home/deno/functions/test')}`; - const importMapPath = null; - const envVarsObj = Deno.env.toObject(); - const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); - const forceCreate = false; - const netAccessDisabled = false; - - // load source from an eszip - //const maybeEszip = await Deno.readFile('./bin.eszip'); - //const maybeEntrypoint = 'file:///src/index.ts'; - - // const maybeEntrypoint = 'file:///src/index.ts'; - // or load module source from an inline module - // const maybeModuleCode = 'Deno.serve((req) => new Response("Hello from Module Code"));'; - // - const cpuTimeSoftLimitMs = 10000; - const cpuTimeHardLimitMs = 20000; - - return await EdgeRuntime.userWorkers.create({ - servicePath, - memoryLimitMb, - workerTimeoutMs, - noModuleCache, - importMapPath, - envVars, - forceCreate, - netAccessDisabled, - cpuTimeSoftLimitMs, - cpuTimeHardLimitMs, - // maybeEszip, - // maybeEntrypoint, - // maybeModuleCode, - }); - }; - - const callWorker = async () => { - try { - // If a worker for the given service path already exists, - // it will be reused by default. - // Update forceCreate option in createWorker to force create a new worker for each request. - const worker = await createWorker(); - const controller = new AbortController(); - - const signal = controller.signal; - // Optional: abort the request after a timeout - //setTimeout(() => controller.abort(), 2 * 60 * 1000); - - return await worker.fetch(req, { signal }); - } catch (e) { - console.error(e); - - if (e instanceof Deno.errors.WorkerRequestCancelled) { - headers.append('Connection', 'close'); - - // XXX(Nyannyacha): I can't think right now how to re-poll - // inside the worker pool without exposing the error to the - // surface. - - // It is satisfied when the supervisor that handled the original - // request terminated due to reaches such as CPU time limit or - // Wall-clock limit. - // - // The current request to the worker has been canceled due to - // some internal reasons. We should repoll the worker and call - // `fetch` again. - - // return await callWorker(); - } - - const error = { msg: e.toString() }; - return new Response( - JSON.stringify(error), - { - status: STATUS_CODE.InternalServerError, - headers, - }, - ); - } - }; - - return callWorker(); + const headers = new Headers({ + 'Content-Type': 'application/json', + }); + + const url = new URL(req.url); + const { pathname } = url; + + // handle health checks + if (pathname === '/_internal/health') { + return new Response( + JSON.stringify({ 'message': 'ok' }), + { + status: STATUS_CODE.OK, + headers, + }, + ); + } + + // if (pathname === '/_internal/segfault') { + // EdgeRuntime.raiseSegfault(); + // return new Response( + // JSON.stringify({ 'message': 'ok' }), + // { + // status: STATUS_CODE.OK, + // headers, + // }, + // ); + // } + + if (pathname === '/_internal/metric') { + const metric = await EdgeRuntime.getRuntimeMetrics(); + return Response.json(metric); + } + + // NOTE: You can test WebSocket in the main worker by uncommenting below. + // if (pathname === '/_internal/ws') { + // const upgrade = req.headers.get("upgrade") || ""; + + // if (upgrade.toLowerCase() != "websocket") { + // return new Response("request isn't trying to upgrade to websocket."); + // } + + // const { socket, response } = Deno.upgradeWebSocket(req); + + // socket.onopen = () => console.log("socket opened"); + // socket.onmessage = (e) => { + // console.log("socket message:", e.data); + // socket.send(new Date().toString()); + // }; + + // socket.onerror = e => console.log("socket errored:", e.message); + // socket.onclose = () => console.log("socket closed"); + + // return response; // 101 (Switching Protocols) + // } + + const REGISTRY_PREFIX = '/_internal/registry'; + if (pathname.startsWith(REGISTRY_PREFIX)) { + return await handleRegistryRequest(REGISTRY_PREFIX, req); + } + + const path_parts = pathname.split('/'); + const service_name = path_parts[1]; + + if (!service_name || service_name === '') { + const error = { msg: 'missing function name in request' }; + return new Response( + JSON.stringify(error), + { status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const servicePath = `./examples/${service_name}`; + // console.error(`serving the request with ${servicePath}`); + + const createWorker = async () => { + const memoryLimitMb = 150; + const workerTimeoutMs = 5 * 60 * 1000; + const noModuleCache = false; + + // you can provide an import map inline + // const inlineImportMap = { + // imports: { + // "std/": "https://deno.land/std@0.131.0/", + // "cors": "./examples/_shared/cors.ts" + // } + // } + + // const importMapPath = `data:${encodeURIComponent(JSON.stringify(importMap))}?${encodeURIComponent('/home/deno/functions/test')}`; + const importMapPath = null; + const envVarsObj = Deno.env.toObject(); + const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); + const forceCreate = false; + const netAccessDisabled = false; + + // load source from an eszip + //const maybeEszip = await Deno.readFile('./bin.eszip'); + //const maybeEntrypoint = 'file:///src/index.ts'; + + // const maybeEntrypoint = 'file:///src/index.ts'; + // or load module source from an inline module + // const maybeModuleCode = 'Deno.serve((req) => new Response("Hello from Module Code"));'; + // + const cpuTimeSoftLimitMs = 10000; + const cpuTimeHardLimitMs = 20000; + + return await EdgeRuntime.userWorkers.create({ + servicePath, + memoryLimitMb, + workerTimeoutMs, + noModuleCache, + importMapPath, + envVars, + forceCreate, + netAccessDisabled, + cpuTimeSoftLimitMs, + cpuTimeHardLimitMs, + // maybeEszip, + // maybeEntrypoint, + // maybeModuleCode, + }); + }; + + const callWorker = async () => { + try { + // If a worker for the given service path already exists, + // it will be reused by default. + // Update forceCreate option in createWorker to force create a new worker for each request. + const worker = await createWorker(); + const controller = new AbortController(); + + const signal = controller.signal; + // Optional: abort the request after a timeout + //setTimeout(() => controller.abort(), 2 * 60 * 1000); + + return await worker.fetch(req, { signal }); + } catch (e) { + console.error(e); + + if (e instanceof Deno.errors.WorkerRequestCancelled) { + headers.append('Connection', 'close'); + + // XXX(Nyannyacha): I can't think right now how to re-poll + // inside the worker pool without exposing the error to the + // surface. + + // It is satisfied when the supervisor that handled the original + // request terminated due to reaches such as CPU time limit or + // Wall-clock limit. + // + // The current request to the worker has been canceled due to + // some internal reasons. We should repoll the worker and call + // `fetch` again. + + // return await callWorker(); + } + + const error = { msg: e.toString() }; + return new Response( + JSON.stringify(error), + { + status: STATUS_CODE.InternalServerError, + headers, + }, + ); + } + }; + + return callWorker(); }); From ce29ee9773da68f97b0d7d6b396c4e9be815dc1a Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 9 Aug 2024 01:17:28 +0000 Subject: [PATCH 18/27] stamp: move `DenoRuntimeDropToken` to `base_rt` crate --- crates/base/src/deno_runtime.rs | 2 +- crates/base_rt/src/lib.rs | 13 +++++++++++++ crates/sb_core/conn_sync.rs | 3 --- crates/sb_core/net.rs | 3 +-- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 5053a2f39..37412cfc4 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -24,7 +24,7 @@ use futures_util::future::poll_fn; use futures_util::task::AtomicWaker; use log::{error, trace}; use once_cell::sync::{Lazy, OnceCell}; -use sb_core::conn_sync::DenoRuntimeDropToken; +use base_rt::DenoRuntimeDropToken; use sb_core::http::sb_core_http; use sb_core::http_start::sb_core_http_start; use sb_core::util::sync::AtomicFlag; diff --git a/crates/base_rt/src/lib.rs b/crates/base_rt/src/lib.rs index 4a04d30ad..51848da3d 100644 --- a/crates/base_rt/src/lib.rs +++ b/crates/base_rt/src/lib.rs @@ -1,6 +1,7 @@ use std::num::NonZeroUsize; use once_cell::sync::Lazy; +use tokio_util::sync::CancellationToken; pub mod error; @@ -58,3 +59,15 @@ pub static USER_WORKER_RT: Lazy = Lazy::new(| ) }) }); + +#[derive(Clone)] +pub struct DenoRuntimeDropToken(pub CancellationToken); + +impl std::ops::Deref for DenoRuntimeDropToken { + type Target = CancellationToken; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + diff --git a/crates/sb_core/conn_sync.rs b/crates/sb_core/conn_sync.rs index 5cefda5d5..8926c3f24 100644 --- a/crates/sb_core/conn_sync.rs +++ b/crates/sb_core/conn_sync.rs @@ -14,6 +14,3 @@ impl ConnWatcher { self.0.clone() } } - -#[derive(Clone)] -pub struct DenoRuntimeDropToken(pub CancellationToken); diff --git a/crates/sb_core/net.rs b/crates/sb_core/net.rs index 3a0308a1f..3275ab8b2 100644 --- a/crates/sb_core/net.rs +++ b/crates/sb_core/net.rs @@ -1,3 +1,4 @@ +use base_rt::DenoRuntimeDropToken; use deno_core::error::bad_resource; use deno_core::error::AnyError; use deno_core::op2; @@ -20,8 +21,6 @@ use tokio_util::sync::CancellationToken; use tracing::span; use tracing::Level; -use crate::conn_sync::DenoRuntimeDropToken; - pub struct TokioDuplexResource { id: usize, rw: AsyncRefCell, From b1673110778ebab09f1c0d7c8bd030a11fe3c68a Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 9 Aug 2024 01:18:06 +0000 Subject: [PATCH 19/27] stamp: rename mod --- crates/base/src/lib.rs | 2 +- crates/base/src/{tracing.rs => tracing_subscriber.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename crates/base/src/{tracing.rs => tracing_subscriber.rs} (100%) diff --git a/crates/base/src/lib.rs b/crates/base/src/lib.rs index 0cbabc7a6..bbd29a557 100644 --- a/crates/base/src/lib.rs +++ b/crates/base/src/lib.rs @@ -16,4 +16,4 @@ pub use sb_core::cache::CacheSetting; pub use sb_graph::DecoratorType; #[cfg(test)] -mod tracing; +mod tracing_subscriber; diff --git a/crates/base/src/tracing.rs b/crates/base/src/tracing_subscriber.rs similarity index 100% rename from crates/base/src/tracing.rs rename to crates/base/src/tracing_subscriber.rs From 6a417459dceae535188085032b2fc8524956153a Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 9 Aug 2024 01:20:36 +0000 Subject: [PATCH 20/27] perf: makes run inference task in the blocking thread pool --- crates/base/src/deno_runtime.rs | 23 ++++---- crates/base_rt/src/lib.rs | 94 ++++++++++++++++++++++++++++++++- crates/sb_ai/lib.rs | 35 ++++++++---- crates/sb_ai/session.rs | 4 +- 4 files changed, 127 insertions(+), 29 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 37412cfc4..211dc4773 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -4,10 +4,11 @@ use crate::rt_worker::worker::DuplexStreamEntry; use crate::utils::path::find_up; use crate::utils::units::{bytes_to_display, mib_to_bytes}; -use anyhow::{anyhow, bail, Context, Error}; +use anyhow::{anyhow, bail, Error}; use base_mem_check::{MemCheckState, WorkerHeapStatistics}; +use base_rt::DenoRuntimeDropToken; +use base_rt::{get_current_cpu_time_ns, BlockingScopeCPUUsage}; use cooked_waker::{IntoWaker, WakeRef}; -use cpu_timer::get_thread_time; use ctor::ctor; use deno_core::error::AnyError; use deno_core::url::Url; @@ -22,9 +23,8 @@ use deno_tls::rustls::RootCertStore; use deno_tls::RootCertStoreProvider; use futures_util::future::poll_fn; use futures_util::task::AtomicWaker; -use log::{error, trace}; +use log::error; use once_cell::sync::{Lazy, OnceCell}; -use base_rt::DenoRuntimeDropToken; use sb_core::http::sb_core_http; use sb_core::http_start::sb_core_http_start; use sb_core::util::sync::AtomicFlag; @@ -45,7 +45,7 @@ use std::time::Duration; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::time::interval; use tokio_util::sync::{CancellationToken, PollSemaphore}; -use tracing::debug; +use tracing::{debug, trace}; use crate::snapshot; use event_worker::events::{EventMetadata, WorkerEventWithMetadata}; @@ -928,11 +928,10 @@ where mem_state.waker.register(waker); trace!( - "name: {:?}, thread_id: {:?}, accumulated_cpu_time: {}ms, malloced: {}", - name.as_ref(), - thread_id, - *accumulated_cpu_time_ns / 1_000_000, - bytes_to_display(total_malloced_bytes as u64) + ?name, + ?thread_id, + accumulated_cpu_time_ms = *accumulated_cpu_time_ns / 1_000_000, + malloced_mb = bytes_to_display(total_malloced_bytes as u64) ); } @@ -1025,10 +1024,6 @@ pub fn import_meta_resolve_callback( loader.resolve(&specifier, &referrer, ResolutionKind::DynamicImport) } -fn get_current_cpu_time_ns() -> Result { - get_thread_time().context("can't get current thread time") -} - fn with_cpu_metrics_guard<'l, F, R>( thread_id: ThreadId, maybe_cpu_usage_metrics_tx: &'l Option>, diff --git a/crates/base_rt/src/lib.rs b/crates/base_rt/src/lib.rs index 51848da3d..9f82310bb 100644 --- a/crates/base_rt/src/lib.rs +++ b/crates/base_rt/src/lib.rs @@ -1,7 +1,17 @@ -use std::num::NonZeroUsize; +use std::{ + num::NonZeroUsize, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; +use cpu_timer::get_thread_time; +use deno_core::{anyhow::Context, error::AnyError, OpState, V8CrossThreadTaskSpawner}; use once_cell::sync::Lazy; +use tokio::{runtime::Handle, sync::oneshot}; use tokio_util::sync::CancellationToken; +use tracing::{debug, debug_span}; pub mod error; @@ -71,3 +81,85 @@ impl std::ops::Deref for DenoRuntimeDropToken { } } +pub fn get_current_cpu_time_ns() -> Result { + get_thread_time().context("can't get current thread time") +} + +pub trait BlockingScopeCPUUsageMetricExt { + fn spawn_cpu_accumul_blocking_scope(self, scope_fn: F) -> tokio::task::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; +} + +#[derive(Default)] +pub struct BlockingScopeCPUUsage(Arc); + +impl BlockingScopeCPUUsage { + pub fn get_cpu_usage_ns_and_reset(state: &mut OpState) -> i64 { + let Some(storage) = state.try_borrow_mut::() else { + return 0; + }; + + storage.0.swap(0, Ordering::SeqCst) + } +} + +impl BlockingScopeCPUUsageMetricExt for &mut OpState { + fn spawn_cpu_accumul_blocking_scope(self, scope_fn: F) -> tokio::task::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let drop_token = self.borrow::().clone(); + let cross_thread_spawner = self.borrow::().clone(); + let usage = { + if let Some(store) = self.try_borrow_mut::() { + store + } else { + self.put(BlockingScopeCPUUsage::default()); + self.borrow_mut() + } + } + .0 + .clone(); + + tokio::task::spawn_blocking(move || { + let _span = debug_span!("cpu_accumul_scope").entered(); + let handle = Handle::current(); + + let (tx, rx) = oneshot::channel::<()>(); + let current_cpu_time_ns = get_current_cpu_time_ns().unwrap_or_default(); + let result = scope_fn(); + let cpu_time_after_drop_ns = get_current_cpu_time_ns().unwrap_or(current_cpu_time_ns); + let diff_cpu_time_ns = std::cmp::max(0, cpu_time_after_drop_ns - current_cpu_time_ns); + + usage.fetch_add(diff_cpu_time_ns, Ordering::SeqCst); + cross_thread_spawner.spawn({ + let span = debug_span!("in v8 stack"); + move |_| { + let _span = span.entered(); + tx.send(()).unwrap(); + } + }); + + handle.block_on({ + let span = debug_span!("wait v8 task done"); + async move { + let _span = span.entered(); + tokio::select! { + _ = rx => {} + _ = drop_token.cancelled() => { + debug!( + js_runtime_dropped = true, + unreported_blocking_cpu_time_ms = diff_cpu_time_ns / 1_000_000 + ); + } + } + } + }); + + result + }) + } +} diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index c79f26424..03014271b 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -2,9 +2,10 @@ mod onnx; mod session; use anyhow::{bail, Error}; +use base_rt::BlockingScopeCPUUsageMetricExt; use deno_core::error::AnyError; -use deno_core::{op2, V8CrossThreadTaskSpawner, V8TaskSpawner}; -use deno_core::{serde_json, OpState}; +use deno_core::OpState; +use deno_core::{op2, JsRuntime, V8CrossThreadTaskSpawner, V8TaskSpawner}; use log::error; use ndarray::{Array1, Array2, ArrayView3, Axis, Ix3}; use ndarray_linalg::norm::{normalize, NormalizeAxis}; @@ -56,11 +57,19 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { let models_dir = std::env::var("SB_AI_MODELS_DIR").unwrap_or("/etc/sb_ai/models".to_string()); - let (req_tx, mut req_rx) = mpsc::unbounded_channel::(); + spawner.spawn(move |scope| { + let state = JsRuntime::op_state_from(scope); + let mut state = state.borrow_mut(); - state.put::>(req_tx); + let mut req_rx = { + let (req_tx, req_rx) = mpsc::unbounded_channel::(); + let _ = state.try_take::>(); + + state.put::>(req_tx); + + req_rx + }; - spawner.spawn(move |_| { let session = load_session_from_file(Path::new(&models_dir).join("gte-small").join("model.onnx")); @@ -162,15 +171,19 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { let req = req.unwrap(); - cross_thread_spawner.spawn(move |_| { - let result = run_inference_fn(req.prompt, req.mean_pool, req.normalize); + cross_thread_spawner.spawn(move |state| { + JsRuntime::op_state_from(state) + .borrow_mut() + .spawn_cpu_accumul_blocking_scope(move || { + let result = run_inference_fn(req.prompt, req.mean_pool, req.normalize); - if req.result_tx.send(result).is_err() { - error!("sb_ai: failed to send inference results (channel error)"); - }; + if req.result_tx.send(result).is_err() { + error!("sb_ai: failed to send inference results (channel error)"); + }; + }); }); } - })); + })) }); Ok(()) diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs index 42b59b5db..685cf70e1 100644 --- a/crates/sb_ai/session.rs +++ b/crates/sb_ai/session.rs @@ -1,13 +1,11 @@ use deno_core::error::AnyError; -use deno_core::serde_json; use once_cell::sync::Lazy; -use serde::Serialize; use std::collections::HashMap; use std::sync::Mutex; use std::usize; use std::{path::PathBuf, sync::Arc}; -use anyhow::{anyhow, Context, Error}; +use anyhow::{anyhow, Error}; use ort::{ CPUExecutionProvider, ExecutionProviderDispatch, GraphOptimizationLevel, Session, SessionBuilder, From 4c986e37bfdebb42db5a90d229e9bf2874b6957f Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 15 Aug 2024 01:50:43 +0000 Subject: [PATCH 21/27] stamp: polishing --- crates/base_rt/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/base_rt/src/lib.rs b/crates/base_rt/src/lib.rs index 9f82310bb..e75160d8a 100644 --- a/crates/base_rt/src/lib.rs +++ b/crates/base_rt/src/lib.rs @@ -11,7 +11,7 @@ use deno_core::{anyhow::Context, error::AnyError, OpState, V8CrossThreadTaskSpaw use once_cell::sync::Lazy; use tokio::{runtime::Handle, sync::oneshot}; use tokio_util::sync::CancellationToken; -use tracing::{debug, debug_span}; +use tracing::{debug, debug_span, Instrument}; pub mod error; @@ -144,9 +144,7 @@ impl BlockingScopeCPUUsageMetricExt for &mut OpState { }); handle.block_on({ - let span = debug_span!("wait v8 task done"); async move { - let _span = span.entered(); tokio::select! { _ = rx => {} _ = drop_token.cancelled() => { @@ -157,6 +155,7 @@ impl BlockingScopeCPUUsageMetricExt for &mut OpState { } } } + .instrument(debug_span!("wait v8 task done")) }); result From a9f24ba48fb35dffd3f1dbbd7deaa3f95519c431 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 12 Jul 2024 00:29:05 +0000 Subject: [PATCH 22/27] chore: update dependencies --- crates/base_rt/Cargo.toml | 1 - crates/sb_ai/Cargo.toml | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/base_rt/Cargo.toml b/crates/base_rt/Cargo.toml index 11f4720d5..bb732aa73 100644 --- a/crates/base_rt/Cargo.toml +++ b/crates/base_rt/Cargo.toml @@ -11,4 +11,3 @@ tokio.workspace = true once_cell.workspace = true tracing.workspace = true tokio-util = { workspace = true, features = ["rt"] } - diff --git a/crates/sb_ai/Cargo.toml b/crates/sb_ai/Cargo.toml index fc146050a..d48320977 100644 --- a/crates/sb_ai/Cargo.toml +++ b/crates/sb_ai/Cargo.toml @@ -36,9 +36,7 @@ ndarray = "0.15" ndarray-linalg = "0.15" rand = "0.8" convert_case = "0.6" -tokenizers = { version = ">=0.13.4", default-features = false, features = [ - "onig", -] } +tokenizers = { version = ">=0.13.4", default-features = false, features = [ "onig" ] } ort = { git = "https://github.com/pykeio/ort", default-features = false, features = [ "ndarray", From faf75387d0e1975bf8f8195557e1eaabd6e54e3b Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 22 Aug 2024 01:34:42 +0000 Subject: [PATCH 23/27] stamp: adjust cpu metrics guard --- crates/base/src/deno_runtime.rs | 54 +++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 211dc4773..1ae89c2e2 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -15,7 +15,7 @@ use deno_core::url::Url; use deno_core::v8::{GCCallbackFlags, GCType, HeapStatistics, Isolate}; use deno_core::{ located_script_name, serde_json, JsRuntime, ModuleCodeString, ModuleId, ModuleLoader, - ModuleSpecifier, PollEventLoopOptions, ResolutionKind, RuntimeOptions, + ModuleSpecifier, OpState, PollEventLoopOptions, ResolutionKind, RuntimeOptions, }; use deno_http::DefaultHttpPropertyExtractor; use deno_tls::deno_native_certs::load_native_certs; @@ -31,12 +31,14 @@ use sb_core::util::sync::AtomicFlag; use sb_fs::static_fs::StaticFs; use serde::Serialize; use std::borrow::Cow; +use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; +use std::rc::Rc; use std::str::FromStr; use std::sync::{Arc, RwLock}; use std::task::Poll; @@ -45,7 +47,7 @@ use std::time::Duration; use tokio::sync::{mpsc, OwnedSemaphorePermit, Semaphore}; use tokio::time::interval; use tokio_util::sync::{CancellationToken, PollSemaphore}; -use tracing::{debug, trace}; +use tracing::{debug, debug_span, instrument, trace, Instrument}; use crate::snapshot; use event_worker::events::{EventMetadata, WorkerEventWithMetadata}; @@ -737,11 +739,12 @@ where let current_thread_id = std::thread::current().id(); let mut accumulated_cpu_time_ns = 0i64; - let has_inspector = self.inspector().is_some(); + let span = debug_span!("runtime", ?name, thread_id = ?current_thread_id); + let inspector = self.inspector(); let mut mod_result_rx = unsafe { self.js_runtime.v8_isolate().enter(); - if has_inspector { + if inspector.is_some() { let is_terminated = self.is_terminated.clone(); let mut this = scopeguard::guard_on_unwind(&mut *self, |this| { this.js_runtime.v8_isolate().exit(); @@ -774,11 +777,13 @@ where with_cpu_metrics_guard( current_thread_id, + js_runtime.op_state(), &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, || js_runtime.mod_evaluate(self.main_module_id), ) - }; + } + .instrument(span.clone()); macro_rules! get_accumulated_cpu_time_ms { () => { @@ -787,12 +792,13 @@ where } { - let event_loop_fut = self.run_event_loop( - name.as_deref(), - current_thread_id, - &maybe_cpu_usage_metrics_tx, - &mut accumulated_cpu_time_ns, - ); + let event_loop_fut = self + .run_event_loop( + current_thread_id, + &maybe_cpu_usage_metrics_tx, + &mut accumulated_cpu_time_ns, + ) + .instrument(span.clone()); let mod_result = tokio::select! { // Not using biased mode leads to non-determinism for relatively simple @@ -821,11 +827,11 @@ where if let Err(err) = self .run_event_loop( - name.as_deref(), current_thread_id, &maybe_cpu_usage_metrics_tx, &mut accumulated_cpu_time_ns, ) + .instrument(span) .await { return ( @@ -839,7 +845,6 @@ where fn run_event_loop<'l>( &'l mut self, - name: Option<&'l str>, #[allow(unused_variables)] current_thread_id: ThreadId, maybe_cpu_usage_metrics_tx: &'l Option>, accumulated_cpu_time_ns: &'l mut i64, @@ -880,6 +885,7 @@ where let js_runtime = &mut this.js_runtime; let cpu_metrics_guard = get_cpu_metrics_guard( thread_id, + js_runtime.op_state(), maybe_cpu_usage_metrics_tx, accumulated_cpu_time_ns, ); @@ -927,12 +933,7 @@ where mem_state.waker.register(waker); - trace!( - ?name, - ?thread_id, - accumulated_cpu_time_ms = *accumulated_cpu_time_ns / 1_000_000, - malloced_mb = bytes_to_display(total_malloced_bytes as u64) - ); + trace!(malloced_mb = bytes_to_display(total_malloced_bytes as u64)); } // NOTE(Nyannyacha): If tasks are empty or V8 is not evaluating the @@ -985,8 +986,11 @@ where })); } + #[instrument(level = "debug", skip(self))] fn wait_for_inspector_session(&mut self) { + debug!(has_inspector = self.maybe_inspector.is_some()); if let Some(inspector) = self.maybe_inspector.as_ref() { + debug!(addr = %inspector.server.host, server.inspector = ?inspector.option); let inspector_impl = self.js_runtime.inspector(); let mut inspector_impl_ref = inspector_impl.borrow_mut(); @@ -1026,6 +1030,7 @@ pub fn import_meta_resolve_callback( fn with_cpu_metrics_guard<'l, F, R>( thread_id: ThreadId, + op_state: Rc>, maybe_cpu_usage_metrics_tx: &'l Option>, accumulated_cpu_time_ns: &'l mut i64, work_fn: F, @@ -1035,6 +1040,7 @@ where { let _cpu_metrics_guard = get_cpu_metrics_guard( thread_id, + op_state, maybe_cpu_usage_metrics_tx, accumulated_cpu_time_ns, ); @@ -1044,6 +1050,7 @@ where fn get_cpu_metrics_guard<'l>( thread_id: ThreadId, + op_state: Rc>, maybe_cpu_usage_metrics_tx: &'l Option>, accumulated_cpu_time_ns: &'l mut i64, ) -> scopeguard::ScopeGuard<(), impl FnOnce(()) + 'l> { @@ -1061,14 +1068,23 @@ fn get_cpu_metrics_guard<'l>( debug_assert_eq!(thread_id, std::thread::current().id()); let cpu_time_after_drop_ns = get_current_cpu_time_ns().unwrap_or(current_cpu_time_ns); + let blocking_cpu_time_ns = + BlockingScopeCPUUsage::get_cpu_usage_ns_and_reset(&mut op_state.borrow_mut()); + let diff_cpu_time_ns = cpu_time_after_drop_ns - current_cpu_time_ns; *accumulated_cpu_time_ns += diff_cpu_time_ns; + *accumulated_cpu_time_ns += blocking_cpu_time_ns; send_cpu_metrics_fn(CPUUsageMetrics::Leave(CPUUsage { accumulated: *accumulated_cpu_time_ns, diff: diff_cpu_time_ns, })); + + debug!( + accumulated_cpu_time_ms = *accumulated_cpu_time_ns / 1_000_000, + blocking_cpu_time_ms = blocking_cpu_time_ns / 1_000_000, + ); }) } From 42f1ebd22a84d9a33e5b471b57c5b1e2cc8c33f0 Mon Sep 17 00:00:00 2001 From: kallebysantos Date: Mon, 28 Oct 2024 11:17:39 +0000 Subject: [PATCH 24/27] stamp: clippy --- crates/sb_ai/session.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs index 685cf70e1..89fc359a3 100644 --- a/crates/sb_ai/session.rs +++ b/crates/sb_ai/session.rs @@ -2,7 +2,6 @@ use deno_core::error::AnyError; use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::Mutex; -use std::usize; use std::{path::PathBuf, sync::Arc}; use anyhow::{anyhow, Error}; From 8601949ea43f9e1c4c255f30f2acac7c7c6049b9 Mon Sep 17 00:00:00 2001 From: kallebysantos Date: Mon, 28 Oct 2024 13:37:43 +0000 Subject: [PATCH 25/27] feat: add GPU support Signed-off-by: kallebysantos --- Dockerfile | 45 +++++++++++++++++++++++++++++++++++------ crates/sb_ai/session.rs | 25 ++++++++++++++++++++--- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index 77c3734a7..1b748796b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,20 +26,53 @@ RUN objcopy --strip-debug \ --add-gnu-debuglink=/root/edge-runtime.debug \ /root/edge-runtime -RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/onnxruntime -RUN ./scripts/download_models.sh -FROM debian:bookworm-slim +# Application runtime without ONNX +FROM debian:bookworm-slim as edge-runtime-base RUN apt-get update && apt-get install -y libssl-dev && rm -rf /var/lib/apt/lists/* RUN apt-get remove -y perl && apt-get autoremove -y COPY --from=builder /root/edge-runtime /usr/local/bin/edge-runtime COPY --from=builder /root/edge-runtime.debug /usr/local/bin/edge-runtime.debug -COPY --from=builder /root/onnxruntime /usr/local/bin/onnxruntime -COPY --from=builder /usr/src/edge-runtime/models /etc/sb_ai/models ENV ORT_DYLIB_PATH=/usr/local/bin/onnxruntime/lib/libonnxruntime.so -ENV SB_AI_MODELS_DIR=/etc/sb_ai/models + + +# ONNX Runtime provider +# Application runtime with ONNX +FROM builder as ort +RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/onnxruntime + + +# ONNX Runtime CUDA provider +# Application runtime with ONNX CUDA +FROM builder as ort-cuda +RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/onnxruntime --gpu + + +FROM builder as preload-models +RUN ./scripts/download_models.sh + + +# With CUDA +FROM nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04 as edge-runtime-cuda + +COPY --from=edge-runtime-base /usr/local/bin/edge-runtime /usr/local/bin/edge-runtime +COPY --from=builder /root/edge-runtime.debug /usr/local/bin/edge-runtime.debug +COPY --from=ort-cuda /root/onnxruntime /usr/local/bin/onnxruntime +COPY --from=preload-models /usr/src/edge-runtime/models /etc/sb_ai/models + +ENV ORT_DYLIB_PATH=/usr/local/bin/onnxruntime/lib/libonnxruntime.so +ENV NVIDIA_VISIBLE_DEVICES=all +ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility + +ENTRYPOINT ["edge-runtime"] + + +# Base +FROM edge-runtime-base as edge-runtime +COPY --from=ort /root/onnxruntime /usr/local/bin/onnxruntime +COPY --from=preload-models /usr/src/edge-runtime/models /etc/sb_ai/models ENTRYPOINT ["edge-runtime"] diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs index 89fc359a3..462b0e1a8 100644 --- a/crates/sb_ai/session.rs +++ b/crates/sb_ai/session.rs @@ -3,11 +3,12 @@ use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::Mutex; use std::{path::PathBuf, sync::Arc}; +use tracing::debug; use anyhow::{anyhow, Error}; use ort::{ - CPUExecutionProvider, ExecutionProviderDispatch, GraphOptimizationLevel, Session, - SessionBuilder, + CPUExecutionProvider, CUDAExecutionProvider, ExecutionProvider, ExecutionProviderDispatch, + GraphOptimizationLevel, Session, SessionBuilder, }; use crate::onnx::ensure_onnx_env_init; @@ -49,6 +50,24 @@ fn cpu_execution_provider() -> Box Box> { + let cuda = CUDAExecutionProvider::default(); + let providers = match cuda.is_available() { + Ok(is_cuda_available) => { + debug!(cuda_support = is_cuda_available); + if is_cuda_available { + vec![cuda.build()] + } else { + vec![] + } + } + + _ => vec![], + }; + + Box::new(providers.into_iter().chain(cpu_execution_provider())) +} + fn create_session(model_bytes: &[u8]) -> Result, Error> { let session = { if let Some(err) = ensure_onnx_env_init() { @@ -56,7 +75,7 @@ fn create_session(model_bytes: &[u8]) -> Result, Error> { } get_session_builder()? - .with_execution_providers(cpu_execution_provider())? + .with_execution_providers(cuda_execution_provider())? .commit_from_memory(model_bytes)? }; From bfc366110ba1eec1088cd530ef49f7e19a0442ff Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 1 Nov 2024 01:18:17 +0000 Subject: [PATCH 26/27] chore: apply format --- Dockerfile | 14 +- crates/base_rt/Cargo.toml | 1 + examples/main/index.ts | 354 +++++++++++++++++++------------------- 3 files changed, 185 insertions(+), 184 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1b748796b..1938ad0d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,16 +15,16 @@ WORKDIR /usr/src/edge-runtime COPY . . RUN --mount=type=cache,target=/usr/local/cargo/registry,id=${TARGETPLATFORM} --mount=type=cache,target=/usr/src/edge-runtime/target,id=${TARGETPLATFORM} \ - GIT_V_TAG=${GIT_V_VERSION} cargo build --profile ${PROFILE} --features "${FEATURES}" && \ - mv /usr/src/edge-runtime/target/${PROFILE}/edge-runtime /root + GIT_V_TAG=${GIT_V_VERSION} cargo build --profile ${PROFILE} --features "${FEATURES}" && \ + mv /usr/src/edge-runtime/target/${PROFILE}/edge-runtime /root RUN objcopy --compress-debug-sections \ - --only-keep-debug \ - /root/edge-runtime \ - /root/edge-runtime.debug + --only-keep-debug \ + /root/edge-runtime \ + /root/edge-runtime.debug RUN objcopy --strip-debug \ - --add-gnu-debuglink=/root/edge-runtime.debug \ - /root/edge-runtime + --add-gnu-debuglink=/root/edge-runtime.debug \ + /root/edge-runtime # Application runtime without ONNX diff --git a/crates/base_rt/Cargo.toml b/crates/base_rt/Cargo.toml index bb732aa73..560b4680f 100644 --- a/crates/base_rt/Cargo.toml +++ b/crates/base_rt/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" deno_core.workspace = true cpu_timer = { version = "0.1.0", path = "../cpu_timer" } + tokio.workspace = true once_cell.workspace = true tracing.workspace = true diff --git a/examples/main/index.ts b/examples/main/index.ts index ee1c4023e..0254ff192 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -10,189 +10,189 @@ console.log('main function started'); // cleanup unused sessions every 30s // setInterval(async () => { -// const { activeUserWorkersCount } = await EdgeRuntime.getRuntimeMetrics(); -// if (activeUserWorkersCount > 0) { -// return; -// } -// try { -// const cleanupCount = await EdgeRuntime.ai.tryCleanupUnusedSession(); -// if (cleanupCount == 0) { -// return; +// const { activeUserWorkersCount } = await EdgeRuntime.getRuntimeMetrics(); +// if (activeUserWorkersCount > 0) { +// return; +// } +// try { +// const cleanupCount = await EdgeRuntime.ai.tryCleanupUnusedSession(); +// if (cleanupCount == 0) { +// return; +// } +// console.log('EdgeRuntime.ai.tryCleanupUnusedSession', cleanupCount); +// } catch (e) { +// console.error(e.toString()); // } -// console.log('EdgeRuntime.ai.tryCleanupUnusedSession', cleanupCount); -// } catch (e) { -// console.error(e.toString()); -// } // }, 30 * 1000); Deno.serve(async (req: Request) => { - const headers = new Headers({ - 'Content-Type': 'application/json', - }); - - const url = new URL(req.url); - const { pathname } = url; - - // handle health checks - if (pathname === '/_internal/health') { - return new Response( - JSON.stringify({ 'message': 'ok' }), - { - status: STATUS_CODE.OK, - headers, - }, - ); - } - - // if (pathname === '/_internal/segfault') { - // EdgeRuntime.raiseSegfault(); - // return new Response( - // JSON.stringify({ 'message': 'ok' }), - // { - // status: STATUS_CODE.OK, - // headers, - // }, - // ); - // } - - if (pathname === '/_internal/metric') { - const metric = await EdgeRuntime.getRuntimeMetrics(); - return Response.json(metric); - } - - // NOTE: You can test WebSocket in the main worker by uncommenting below. - // if (pathname === '/_internal/ws') { - // const upgrade = req.headers.get("upgrade") || ""; - - // if (upgrade.toLowerCase() != "websocket") { - // return new Response("request isn't trying to upgrade to websocket."); - // } - - // const { socket, response } = Deno.upgradeWebSocket(req); - - // socket.onopen = () => console.log("socket opened"); - // socket.onmessage = (e) => { - // console.log("socket message:", e.data); - // socket.send(new Date().toString()); - // }; - - // socket.onerror = e => console.log("socket errored:", e.message); - // socket.onclose = () => console.log("socket closed"); - - // return response; // 101 (Switching Protocols) - // } - - const REGISTRY_PREFIX = '/_internal/registry'; - if (pathname.startsWith(REGISTRY_PREFIX)) { - return await handleRegistryRequest(REGISTRY_PREFIX, req); - } - - const path_parts = pathname.split('/'); - const service_name = path_parts[1]; - - if (!service_name || service_name === '') { - const error = { msg: 'missing function name in request' }; - return new Response( - JSON.stringify(error), - { status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, - ); - } - - const servicePath = `./examples/${service_name}`; - // console.error(`serving the request with ${servicePath}`); - - const createWorker = async () => { - const memoryLimitMb = 150; - const workerTimeoutMs = 5 * 60 * 1000; - const noModuleCache = false; - - // you can provide an import map inline - // const inlineImportMap = { - // imports: { - // "std/": "https://deno.land/std@0.131.0/", - // "cors": "./examples/_shared/cors.ts" - // } + const headers = new Headers({ + 'Content-Type': 'application/json', + }); + + const url = new URL(req.url); + const { pathname } = url; + + // handle health checks + if (pathname === '/_internal/health') { + return new Response( + JSON.stringify({ 'message': 'ok' }), + { + status: STATUS_CODE.OK, + headers, + }, + ); + } + + // if (pathname === '/_internal/segfault') { + // EdgeRuntime.raiseSegfault(); + // return new Response( + // JSON.stringify({ 'message': 'ok' }), + // { + // status: STATUS_CODE.OK, + // headers, + // }, + // ); // } - // const importMapPath = `data:${encodeURIComponent(JSON.stringify(importMap))}?${encodeURIComponent('/home/deno/functions/test')}`; - const importMapPath = null; - const envVarsObj = Deno.env.toObject(); - const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); - const forceCreate = false; - const netAccessDisabled = false; - - // load source from an eszip - //const maybeEszip = await Deno.readFile('./bin.eszip'); - //const maybeEntrypoint = 'file:///src/index.ts'; - - // const maybeEntrypoint = 'file:///src/index.ts'; - // or load module source from an inline module - // const maybeModuleCode = 'Deno.serve((req) => new Response("Hello from Module Code"));'; - // - const cpuTimeSoftLimitMs = 10000; - const cpuTimeHardLimitMs = 20000; - - return await EdgeRuntime.userWorkers.create({ - servicePath, - memoryLimitMb, - workerTimeoutMs, - noModuleCache, - importMapPath, - envVars, - forceCreate, - netAccessDisabled, - cpuTimeSoftLimitMs, - cpuTimeHardLimitMs, - // maybeEszip, - // maybeEntrypoint, - // maybeModuleCode, - }); - }; - - const callWorker = async () => { - try { - // If a worker for the given service path already exists, - // it will be reused by default. - // Update forceCreate option in createWorker to force create a new worker for each request. - const worker = await createWorker(); - const controller = new AbortController(); - - const signal = controller.signal; - // Optional: abort the request after a timeout - //setTimeout(() => controller.abort(), 2 * 60 * 1000); - - return await worker.fetch(req, { signal }); - } catch (e) { - console.error(e); - - if (e instanceof Deno.errors.WorkerRequestCancelled) { - headers.append('Connection', 'close'); - - // XXX(Nyannyacha): I can't think right now how to re-poll - // inside the worker pool without exposing the error to the - // surface. - - // It is satisfied when the supervisor that handled the original - // request terminated due to reaches such as CPU time limit or - // Wall-clock limit. - // - // The current request to the worker has been canceled due to - // some internal reasons. We should repoll the worker and call - // `fetch` again. - - // return await callWorker(); - } - - const error = { msg: e.toString() }; - return new Response( - JSON.stringify(error), - { - status: STATUS_CODE.InternalServerError, - headers, - }, - ); + if (pathname === '/_internal/metric') { + const metric = await EdgeRuntime.getRuntimeMetrics(); + return Response.json(metric); + } + + // NOTE: You can test WebSocket in the main worker by uncommenting below. + // if (pathname === '/_internal/ws') { + // const upgrade = req.headers.get("upgrade") || ""; + + // if (upgrade.toLowerCase() != "websocket") { + // return new Response("request isn't trying to upgrade to websocket."); + // } + + // const { socket, response } = Deno.upgradeWebSocket(req); + + // socket.onopen = () => console.log("socket opened"); + // socket.onmessage = (e) => { + // console.log("socket message:", e.data); + // socket.send(new Date().toString()); + // }; + + // socket.onerror = e => console.log("socket errored:", e.message); + // socket.onclose = () => console.log("socket closed"); + + // return response; // 101 (Switching Protocols) + // } + + const REGISTRY_PREFIX = '/_internal/registry'; + if (pathname.startsWith(REGISTRY_PREFIX)) { + return await handleRegistryRequest(REGISTRY_PREFIX, req); } - }; - return callWorker(); + const path_parts = pathname.split('/'); + const service_name = path_parts[1]; + + if (!service_name || service_name === '') { + const error = { msg: 'missing function name in request' }; + return new Response( + JSON.stringify(error), + { status: STATUS_CODE.BadRequest, headers: { 'Content-Type': 'application/json' } }, + ); + } + + const servicePath = `./examples/${service_name}`; + // console.error(`serving the request with ${servicePath}`); + + const createWorker = async () => { + const memoryLimitMb = 150; + const workerTimeoutMs = 5 * 60 * 1000; + const noModuleCache = false; + + // you can provide an import map inline + // const inlineImportMap = { + // imports: { + // "std/": "https://deno.land/std@0.131.0/", + // "cors": "./examples/_shared/cors.ts" + // } + // } + + // const importMapPath = `data:${encodeURIComponent(JSON.stringify(importMap))}?${encodeURIComponent('/home/deno/functions/test')}`; + const importMapPath = null; + const envVarsObj = Deno.env.toObject(); + const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); + const forceCreate = false; + const netAccessDisabled = false; + + // load source from an eszip + //const maybeEszip = await Deno.readFile('./bin.eszip'); + //const maybeEntrypoint = 'file:///src/index.ts'; + + // const maybeEntrypoint = 'file:///src/index.ts'; + // or load module source from an inline module + // const maybeModuleCode = 'Deno.serve((req) => new Response("Hello from Module Code"));'; + // + const cpuTimeSoftLimitMs = 10000; + const cpuTimeHardLimitMs = 20000; + + return await EdgeRuntime.userWorkers.create({ + servicePath, + memoryLimitMb, + workerTimeoutMs, + noModuleCache, + importMapPath, + envVars, + forceCreate, + netAccessDisabled, + cpuTimeSoftLimitMs, + cpuTimeHardLimitMs, + // maybeEszip, + // maybeEntrypoint, + // maybeModuleCode, + }); + }; + + const callWorker = async () => { + try { + // If a worker for the given service path already exists, + // it will be reused by default. + // Update forceCreate option in createWorker to force create a new worker for each request. + const worker = await createWorker(); + const controller = new AbortController(); + + const signal = controller.signal; + // Optional: abort the request after a timeout + //setTimeout(() => controller.abort(), 2 * 60 * 1000); + + return await worker.fetch(req, { signal }); + } catch (e) { + console.error(e); + + if (e instanceof Deno.errors.WorkerRequestCancelled) { + headers.append('Connection', 'close'); + + // XXX(Nyannyacha): I can't think right now how to re-poll + // inside the worker pool without exposing the error to the + // surface. + + // It is satisfied when the supervisor that handled the original + // request terminated due to reaches such as CPU time limit or + // Wall-clock limit. + // + // The current request to the worker has been canceled due to + // some internal reasons. We should repoll the worker and call + // `fetch` again. + + // return await callWorker(); + } + + const error = { msg: e.toString() }; + return new Response( + JSON.stringify(error), + { + status: STATUS_CODE.InternalServerError, + headers, + }, + ); + } + }; + + return callWorker(); }); From 53b2100602b46f91d5ddb902c841b4292a75ce05 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 1 Nov 2024 01:20:09 +0000 Subject: [PATCH 27/27] stamp: insert tracing macros at some points --- crates/sb_ai/lib.rs | 11 +++++------ crates/sb_ai/onnx.rs | 5 ++--- crates/sb_ai/session.rs | 6 +++++- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/sb_ai/lib.rs b/crates/sb_ai/lib.rs index 03014271b..edef73833 100644 --- a/crates/sb_ai/lib.rs +++ b/crates/sb_ai/lib.rs @@ -6,7 +6,6 @@ use base_rt::BlockingScopeCPUUsageMetricExt; use deno_core::error::AnyError; use deno_core::OpState; use deno_core::{op2, JsRuntime, V8CrossThreadTaskSpawner, V8TaskSpawner}; -use log::error; use ndarray::{Array1, Array2, ArrayView3, Axis, Ix3}; use ndarray_linalg::norm::{normalize, NormalizeAxis}; use ort::inputs; @@ -19,7 +18,7 @@ use tokenizers::Tokenizer; use tokio::sync::mpsc; use tokio::task; -use tracing::trace_span; +use tracing::{error, trace_span}; deno_core::extension!( sb_ai, @@ -75,7 +74,7 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { if session.is_err() { let err = session.as_ref().unwrap_err(); - error!("sb_ai: failed to create session - {}", err); + error!(reason = ?err, "failed to create session"); return; } @@ -89,7 +88,7 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { if tokenizer.is_err() { let err = tokenizer.as_ref().unwrap_err(); - error!("sb_ai: failed to create tokenizer: {}", err); + error!(reason = ?err, "failed to create tokenizer"); return; } @@ -177,8 +176,8 @@ fn init_gte(state: &mut OpState) -> Result<(), Error> { .spawn_cpu_accumul_blocking_scope(move || { let result = run_inference_fn(req.prompt, req.mean_pool, req.normalize); - if req.result_tx.send(result).is_err() { - error!("sb_ai: failed to send inference results (channel error)"); + if let Err(err) = req.result_tx.send(result) { + error!(reason = ?err, "failed to send inference results"); }; }); }); diff --git a/crates/sb_ai/onnx.rs b/crates/sb_ai/onnx.rs index ea77ce5a7..c3800ab00 100644 --- a/crates/sb_ai/onnx.rs +++ b/crates/sb_ai/onnx.rs @@ -4,9 +4,8 @@ use std::sync::{ }; use ctor::ctor; -use log::error; use scopeguard::guard; -use tracing::instrument; +use tracing::{error, instrument}; static ONNX_INIT_ONNX_ENV_DONE: AtomicBool = AtomicBool::new(false); static ONNX_INIT_RESULT: Mutex>> = Mutex::new(None); @@ -48,7 +47,7 @@ fn init_onnx_env() { } Ok(Err(err)) => { - error!("sb_ai: failed to create environment: {}", err); + error!(reason = ?err, "failed to create environment"); let _ = guard1.insert(Arc::new(err)); } diff --git a/crates/sb_ai/session.rs b/crates/sb_ai/session.rs index 462b0e1a8..219b6b382 100644 --- a/crates/sb_ai/session.rs +++ b/crates/sb_ai/session.rs @@ -3,7 +3,7 @@ use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::Mutex; use std::{path::PathBuf, sync::Arc}; -use tracing::debug; +use tracing::{debug, instrument, trace}; use anyhow::{anyhow, Error}; use ort::{ @@ -84,6 +84,7 @@ fn create_session(model_bytes: &[u8]) -> Result, Error> { Ok(session) } +#[instrument(level = "debug", ret)] pub(crate) fn load_session_from_file( model_file_path: PathBuf, ) -> Result<(String, Arc), Error> { @@ -92,17 +93,20 @@ pub(crate) fn load_session_from_file( let mut sessions = SESSIONS.lock().unwrap(); if let Some(session) = sessions.get(&session_id) { + trace!(session_id, "use existing session"); return Ok((session_id, session.clone())); } let model_bytes = std::fs::read(model_file_path)?; let session = create_session(model_bytes.as_slice())?; + trace!(session_id, "new session"); sessions.insert(session_id.to_owned(), session.clone()); Ok((session_id, session)) } +#[instrument(level = "debug", ret)] pub fn cleanup() -> Result { let mut remove_counter = 0; {