Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/base/src/utils/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ impl TestBedBuilder {
}
}

#[derive(Clone)]
pub struct TestBed {
pool_termination_token: TerminationToken,
main_termination_token: TerminationToken,
Expand Down
59 changes: 38 additions & 21 deletions crates/base/src/worker/worker_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ impl Worker {

let _ =
booter_signal.send(Ok((metric_src, new_runtime.drop_token.clone())));

let span = debug_span!(
"poll",
thread = ?std::thread::current().id(),
);

let supervise_fut = match imp.clone().supervise(&mut new_runtime) {
Some(v) => v.boxed(),
None if worker_kind.is_user_worker() => return None,
Expand All @@ -290,28 +296,41 @@ impl Worker {
}
});

let result = imp.on_created(&mut new_runtime).await;
let maybe_uncaught_exception_event = match result.as_ref() {
Ok(WorkerEvents::UncaughtException(ev)) => Some(ev.clone()),
Err(err) => Some(UncaughtExceptionEvent {
cpu_time_used: 0,
exception: err.to_string(),
}),
let worker_poll_fut = async move {
let result = imp.on_created(&mut new_runtime).await;
let maybe_uncaught_exception_event = match result.as_ref() {
Ok(WorkerEvents::UncaughtException(ev)) => Some(ev.clone()),
Err(err) => Some(UncaughtExceptionEvent {
cpu_time_used: 0,
exception: err.to_string(),
}),

_ => None,
};
_ => None,
};

if let Some(ev) = maybe_uncaught_exception_event {
exit.set(WorkerExitStatus::WithUncaughtException(ev)).await;
}
if let Some(ev) = maybe_uncaught_exception_event {
exit.set(WorkerExitStatus::WithUncaughtException(ev)).await;
}

drop(new_runtime);
let _ = supervise_fut.await;
drop(new_runtime);
let _ = supervise_fut.await;

Some(result)
result
}
.instrument(span);

Some(
rt.spawn_pinned({
let fut = unsafe { MaskFutureAsSend::new(worker_poll_fut) };
move || tokio::task::spawn_local(fut)
})
.await
.map_err(anyhow::Error::from)
.and_then(|it| it.map_err(anyhow::Error::from))
.and_then(|it| it.into_inner()),
)
};

let worker_fut = {
let worker_result_fut = {
let event_metadata = event_metadata.clone();
async move {
let Some(result) = worker_fut.await else {
Expand Down Expand Up @@ -349,13 +368,11 @@ impl Worker {
"worker",
id = worker_name.as_str(),
kind = %worker_kind,
thread = ?std::thread::current().id(),
metadata = ?event_metadata
));

drop(rt.spawn_pinned({
let worker_fut = unsafe { MaskFutureAsSend::new(worker_fut) };
move || tokio::task::spawn_local(worker_fut)
drop(tokio::spawn(unsafe {
MaskFutureAsSend::new(worker_result_fut)
}));
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/base/test_cases/issue-func-284/baseline/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Deno.serve(() => {
return new Response("meow");
});
19 changes: 19 additions & 0 deletions crates/base/test_cases/issue-func-284/noisy/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
function mySlowFunction(baseNumber) {
console.time("mySlowFunction");
let now = Date.now();
let result = 0;
for (var i = Math.pow(baseNumber, 7); i >= 0; i--) {
result += Math.atan(i) * Math.tan(i);
}
let duration = Date.now() - now;
console.timeEnd("mySlowFunction");
return { result: result, duration: duration };
}

Deno.serve(async () => {
let count = 0;
while (++count != 300) {
mySlowFunction(8);
}
return new Response("meow");
});
65 changes: 65 additions & 0 deletions crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use deno_facade::EszipPayloadKind;
use deno_facade::Metadata;
use ext_event_worker::events::LogLevel;
use ext_event_worker::events::ShutdownReason;
use ext_event_worker::events::WorkerEventWithMetadata;
use ext_event_worker::events::WorkerEvents;
use ext_runtime::SharedMetricSource;
use ext_workers::context::MainWorkerRuntimeOpts;
Expand Down Expand Up @@ -2606,6 +2607,70 @@ async fn test_issue_func_280() {
run("mem", ShutdownReason::Memory).await;
}

#[tokio::test]
#[serial]
async fn test_issue_func_284() {
async fn find_boot_event(
rx: &mut mpsc::UnboundedReceiver<WorkerEventWithMetadata>,
) -> Option<usize> {
while let Some(ev) = rx.recv().await {
match ev.event {
WorkerEvents::Boot(ev) => return Some(ev.boot_time),
_ => continue,
}
}

None
}

let (tx, mut rx) = mpsc::unbounded_channel();
let tb = TestBedBuilder::new("./test_cases/main")
.with_per_worker_policy(None)
.with_worker_event_sender(Some(tx))
.build()
.await;

tokio::spawn({
let tb = tb.clone();
async move {
tb.request(|b| {
b.uri("/meow")
.header("x-service-path", "issue-func-284/noisy")
.body(Body::empty())
.context("can't make request")
})
.await
.unwrap();
}
});

timeout(Duration::from_secs(1), find_boot_event(&mut rx))
.await
.unwrap()
.unwrap();

tokio::spawn({
let tb = tb.clone();
async move {
tb.request(|b| {
b.uri("/meow")
.header("x-service-path", "issue-func-284/baseline")
.body(Body::empty())
.context("can't make request")
})
.await
.unwrap();
}
});

let boot_time = timeout(Duration::from_secs(1), find_boot_event(&mut rx))
.await
.unwrap()
.unwrap();

assert!(boot_time < 1000);
}

#[tokio::test]
#[serial]
async fn test_should_render_detailed_failed_to_create_graph_error() {
Expand Down