Skip to content

Commit

Permalink
feat: reintroduce DuplexStream (#327)
Browse files Browse the repository at this point in the history
* feat: reintroduce `DuplexStream`

See #320

* stamp: add an example for chunked transfer encoding

* fix: defer pipe closure timing

* stamp: polishing

* fix: original request body should be able to abort when a request to the user worker has been cancelled

* stamp: adjust edge runtime arguments of `scripts/run.sh`

* stamp(k6): add a scenario for upload failure due to exceeding max size

* refactor: simplify the expression of connection lifetime using `CancellationToken`

* fix: update feature flag for `base` crate

* stamp: polishing

* chore: update `Dockerfile`

* chore: update file update examples

* chore(base): update dependencies

* chore: update `Cargo.lock`

* stamp: update deno file update example

* stamp: polishing

* stamp: add an integration test for request failure during multipart uploading

* stamp: don’t expose `Interrupted` error to outside

* chore(sb_workers): update dependencies

* chore: update `Cargo.lock`

* fix: cleanup request body unconditionally

* stamp: update oak file update example

* fix: propagate user worker side error properly

* stamp: cleanup tests codes and add more tests for file uploads

* fix: prevent stack overflow when dropping `Stream2`

* stamp: disambiguate local variable naming

* stamp: polishing

* stamp(k6): update a scenario

* stamp: update an oak file upload example

* stamp: update main service example
  • Loading branch information
nyannyacha committed May 7, 2024
1 parent 0648aa0 commit 2800bac
Show file tree
Hide file tree
Showing 26 changed files with 781 additions and 353 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ ARG GIT_V_VERSION
ARG ONNXRUNTIME_VERSION=1.17.0
RUN apt-get update && apt-get install -y llvm-dev libclang-dev clang cmake
WORKDIR /usr/src/edge-runtime

RUN --mount=type=cache,target=/usr/local/cargo/registry,id=${TARGETPLATFORM} \
cargo install cargo-strip
COPY . .

RUN --mount=type=cache,target=/usr/local/cargo/registry,id=${TARGETPLATFORM} --mount=type=cache,target=/usr/src/edge-runtime/target,id=${TARGETPLATFORM} \
GIT_V_TAG=${GIT_V_VERSION} cargo build --release && \
cargo strip && \
mv /usr/src/edge-runtime/target/release/edge-runtime /root

RUN ./scripts/install_onnx.sh $ONNXRUNTIME_VERSION $TARGETPLATFORM /root/libonnxruntime.so
RUN ./scripts/download_models.sh

Expand Down
6 changes: 5 additions & 1 deletion crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ cooked-waker = { version = "5" }

[dev-dependencies]
tokio-util = { workspace = true, features = ["rt", "compat"] }
reqwest = { workspace = true, features = ["multipart"] }
serial_test = { version = "3.0.0" }
async-tungstenite = { version = "0.25.0", default-features = false }
tungstenite = { version = "0.21.0", default-features = false, features = ["handshake"] }
Expand All @@ -85,6 +86,7 @@ sb_workers = { version = "0.1.0", path = "../sb_workers" }
sb_env = { version = "0.1.0", path = "../sb_env" }
sb_os = { version = "0.1.0", path = "../sb_os" }
sb_node = { version = "0.1.0", path = "../node" }
sb_ai = { version = "0.1.0", path = "../sb_ai" }
anyhow = { workspace = true }
bytes = { workspace = true }
deno_ast = { workspace = true }
Expand Down Expand Up @@ -112,4 +114,6 @@ deno_broadcast_channel.workspace = true
deno_core.workspace = true
deno_canvas.workspace = true
deno_webgpu.workspace = true
sb_ai = { version = "0.1.0", path = "../sb_ai" }

[features]
termination-signal-ext = []
27 changes: 13 additions & 14 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::inspector_server::Inspector;
use crate::rt_worker::rt;
use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics};
use crate::rt_worker::worker::UnixStreamEntry;
use crate::rt_worker::worker::DuplexStreamEntry;
use crate::utils::units::{bytes_to_display, mib_to_bytes};

use anyhow::{anyhow, bail, Context, Error};
Expand All @@ -24,7 +24,6 @@ use futures_util::future::poll_fn;
use futures_util::task::AtomicWaker;
use log::{error, trace};
use once_cell::sync::{Lazy, OnceCell};
use sb_core::conn_sync::ConnSync;
use sb_core::http::sb_core_http;
use sb_core::http_start::sb_core_http_start;
use sb_core::util::sync::AtomicFlag;
Expand All @@ -33,11 +32,10 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::ffi::c_void;
use std::fmt;
use std::os::fd::RawFd;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::{mpsc, watch, Notify};
use tokio::sync::{mpsc, Notify};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -524,7 +522,7 @@ impl DenoRuntime {
}

if conf.is_main_worker() || conf.is_user_worker() {
op_state.put::<HashMap<RawFd, watch::Receiver<ConnSync>>>(HashMap::new());
op_state.put::<HashMap<usize, CancellationToken>>(HashMap::new());
}

if conf.is_user_worker() {
Expand Down Expand Up @@ -595,14 +593,15 @@ impl DenoRuntime {

pub async fn run(
&mut self,
unix_stream_rx: mpsc::UnboundedReceiver<UnixStreamEntry>,
duplex_stream_rx: mpsc::UnboundedReceiver<DuplexStreamEntry>,
maybe_cpu_usage_metrics_tx: Option<mpsc::UnboundedSender<CPUUsageMetrics>>,
name: Option<String>,
) -> (Result<(), Error>, i64) {
{
let op_state_rc = self.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
op_state.put::<mpsc::UnboundedReceiver<UnixStreamEntry>>(unix_stream_rx);

op_state.put::<mpsc::UnboundedReceiver<DuplexStreamEntry>>(duplex_stream_rx);

if self.conf.is_main_worker() {
op_state.put::<mpsc::UnboundedSender<UserWorkerMsgs>>(
Expand Down Expand Up @@ -886,7 +885,7 @@ extern "C" fn mem_check_gc_prologue_callback_fn(
#[cfg(test)]
mod test {
use crate::deno_runtime::DenoRuntime;
use crate::rt_worker::worker::UnixStreamEntry;
use crate::rt_worker::worker::DuplexStreamEntry;
use deno_core::{FastString, ModuleCodeString, PollEventLoopOptions};
use sb_graph::emitter::EmitterFactory;
use sb_graph::{generate_binary_eszip, EszipPayloadKind};
Expand Down Expand Up @@ -1466,8 +1465,8 @@ mod test {
let mut user_rt =
create_basic_user_runtime("./test_cases/array_buffers", 20, 1000, &[]).await;

let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
let (_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
let (result, _) = user_rt.run(duplex_stream_rx, None, None).await;

assert!(result.is_ok(), "expected no errors");

Expand All @@ -1481,8 +1480,8 @@ mod test {
let mut user_rt =
create_basic_user_runtime("./test_cases/array_buffers", 15, 1000, &[]).await;

let (_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
let (_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
let (result, _) = user_rt.run(duplex_stream_rx, None, None).await;

match result {
Err(err) => {
Expand All @@ -1500,7 +1499,7 @@ mod test {
memory_limit_mb: u64,
worker_timeout_ms: u64,
) {
let (_unix_stream_tx, unix_stream_rx) = mpsc::unbounded_channel::<UnixStreamEntry>();
let (_duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
let (callback_tx, mut callback_rx) = mpsc::unbounded_channel::<()>();
let mut user_rt =
create_basic_user_runtime(path, memory_limit_mb, worker_timeout_ms, static_patterns)
Expand All @@ -1517,7 +1516,7 @@ mod test {
});

let wait_fut = async move {
let (result, _) = user_rt.run(unix_stream_rx, None, None).await;
let (result, _) = user_rt.run(duplex_stream_rx, None, None).await;

assert_eq!(
result.unwrap_err().to_string(),
Expand Down
6 changes: 3 additions & 3 deletions crates/base/src/rt_worker/implementation/default_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::deno_runtime::DenoRuntime;
use crate::rt_worker::supervisor::CPUUsageMetrics;
use crate::rt_worker::worker::{HandleCreationType, UnixStreamEntry, Worker, WorkerHandler};
use crate::rt_worker::worker::{DuplexStreamEntry, HandleCreationType, Worker, WorkerHandler};
use anyhow::Error;
use event_worker::events::{
BootFailureEvent, EventLoopCompletedEvent, UncaughtExceptionEvent, WorkerEvents,
Expand All @@ -22,14 +22,14 @@ impl WorkerHandler for Worker {
fn handle_creation<'r>(
&self,
created_rt: &'r mut DenoRuntime,
unix_stream_rx: UnboundedReceiver<UnixStreamEntry>,
duplex_stream_rx: UnboundedReceiver<DuplexStreamEntry>,
termination_event_rx: Receiver<WorkerEvents>,
maybe_cpu_usage_metrics_tx: Option<UnboundedSender<CPUUsageMetrics>>,
name: Option<String>,
) -> HandleCreationType<'r> {
let run_worker_rt = async move {
match created_rt
.run(unix_stream_rx, maybe_cpu_usage_metrics_tx, name)
.run(duplex_stream_rx, maybe_cpu_usage_metrics_tx, name)
.await
{
// if the error is execution terminated, check termination event reason
Expand Down
22 changes: 10 additions & 12 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ use event_worker::events::{
};
use futures_util::FutureExt;
use log::{debug, error};
use sb_core::conn_sync::ConnSync;
use sb_core::{MetricSource, RuntimeMetricSource, WorkerMetricSource};
use sb_workers::context::{UserWorkerMsgs, WorkerContextInitOpts};
use std::any::Any;
use std::future::{pending, Future};
use std::pin::Pin;
use tokio::net::UnixStream;
use tokio::io;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::{oneshot, watch};
use tokio::sync::oneshot::{self, Receiver, Sender};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
Expand All @@ -44,14 +42,14 @@ pub struct Worker {
}

pub type HandleCreationType<'r> = Pin<Box<dyn Future<Output = Result<WorkerEvents, Error>> + 'r>>;
pub type UnixStreamEntry = (UnixStream, Option<watch::Receiver<ConnSync>>);
pub type DuplexStreamEntry = (io::DuplexStream, Option<CancellationToken>);

pub trait WorkerHandler: Send {
fn handle_error(&self, error: Error) -> Result<WorkerEvents, Error>;
fn handle_creation<'r>(
&self,
created_rt: &'r mut DenoRuntime,
unix_stream_rx: UnboundedReceiver<UnixStreamEntry>,
duplex_stream_rx: UnboundedReceiver<DuplexStreamEntry>,
termination_event_rx: Receiver<WorkerEvents>,
maybe_cpu_metrics_tx: Option<UnboundedSender<CPUUsageMetrics>>,
name: Option<String>,
Expand Down Expand Up @@ -91,9 +89,9 @@ impl Worker {
pub fn start(
&self,
mut opts: WorkerContextInitOpts,
unix_stream_pair: (
UnboundedSender<UnixStreamEntry>,
UnboundedReceiver<UnixStreamEntry>,
duplex_stream_pair: (
UnboundedSender<DuplexStreamEntry>,
UnboundedReceiver<DuplexStreamEntry>,
),
booter_signal: Sender<Result<MetricSource, Error>>,
termination_token: Option<TerminationToken>,
Expand All @@ -104,7 +102,7 @@ impl Worker {
let event_metadata = self.event_metadata.clone();
let supervisor_policy = self.supervisor_policy;

let (unix_stream_tx, unix_stream_rx) = unix_stream_pair;
let (duplex_stream_tx, duplex_stream_rx) = duplex_stream_pair;
let events_msg_tx = self.events_msg_tx.clone();
let pool_msg_tx = self.pool_msg_tx.clone();

Expand Down Expand Up @@ -244,7 +242,7 @@ impl Worker {
let result = method_cloner
.handle_creation(
&mut runtime,
unix_stream_rx,
duplex_stream_rx,
termination_event_rx,
maybe_cpu_usage_metrics_tx,
Some(worker_name),
Expand Down Expand Up @@ -283,7 +281,7 @@ impl Worker {
}
};

drop(unix_stream_tx);
drop(duplex_stream_tx);

match result {
Ok(event) => {
Expand Down

0 comments on commit 2800bac

Please sign in to comment.