Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(runtime,serverless,cli): properly handle errors and timeouts while streaming #416

Merged
merged 2 commits into from Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/heavy-kings-float.md
@@ -0,0 +1,6 @@
---
'@lagon/cli': patch
'@lagon/serverless': patch
---

Stop streaming and log errors when we have errors/timeouts/memory limits
5 changes: 5 additions & 0 deletions .changeset/hip-mails-shop.md
@@ -0,0 +1,5 @@
---
'@lagon/runtime': patch
---

Add promise_reject_callback to always throw errors
5 changes: 5 additions & 0 deletions .changeset/rude-items-brush.md
@@ -0,0 +1,5 @@
---
'@lagon/serverless': patch
---

Handle termination results (timeouts and memory limit) before processing streaming to avoid hanging
20 changes: 13 additions & 7 deletions packages/cli/src/commands/dev.rs
Expand Up @@ -22,7 +22,8 @@ use tokio::sync::Mutex;
use tokio_util::task::LocalPoolHandle;

use crate::utils::{
bundle_function, info, input, success, validate_code_file, validate_public_dir, warn, Assets,
bundle_function, error, info, input, success, validate_code_file, validate_public_dir, warn,
Assets,
};

use log::{
Expand Down Expand Up @@ -72,7 +73,7 @@ fn parse_environment_variables(env: Option<PathBuf>) -> Result<HashMap<String, S

// This function is similar to packages/serverless/src/main.rs,
// expect that we don't have multiple deployments and such multiple
// threads to manage.
// threads to manage, and we don't manager logs and metrics.
async fn handle_request(
req: HyperRequest<Body>,
ip: String,
Expand Down Expand Up @@ -177,16 +178,21 @@ async fn handle_request(
}

tokio::spawn(async move {
while let Ok(RunResult::Stream(stream_result)) = rx.recv_async().await {
match stream_result {
StreamResult::Start(response) => {
while let Ok(result) = rx.recv_async().await {
match result {
RunResult::Stream(StreamResult::Start(response)) => {
response_tx.send_async(response).await.unwrap_or(());
}
StreamResult::Data(bytes) => {
RunResult::Stream(StreamResult::Data(bytes)) => {
let bytes = Bytes::from(bytes);
stream_tx.send_async(Ok(bytes)).await.unwrap_or(());
}
_ => {}
_ => {
println!("{} {:?}", error("Unexpected stream result:"), result);
// Close the stream by sending empty bytes if we receive anything
// else than StreamResult (e.g errors)
stream_tx.send_async(Ok(Bytes::new())).await.unwrap_or(());
}
}
}
});
Expand Down
59 changes: 44 additions & 15 deletions packages/runtime/src/isolate/mod.rs
Expand Up @@ -12,12 +12,11 @@ use std::{
};

use futures::{future::poll_fn, stream::FuturesUnordered, Future, StreamExt};
use v8::PromiseState;

use crate::{
http::{FromV8, IntoV8, Request, Response, RunResult, StreamResult},
runtime::{get_runtime_code, POOL},
utils::{v8_boolean, v8_string, v8_uint8array},
utils::{extract_v8_string, v8_boolean, v8_string, v8_uint8array},
};

use self::bindings::{BindingResult, PromiseResult};
Expand All @@ -39,6 +38,23 @@ extern "C" fn heap_limit_callback(
current_heap_limit * 2
}

extern "C" fn promise_reject_callback(message: v8::PromiseRejectMessage) {
if message.get_event() == v8::PromiseRejectEvent::PromiseRejectWithNoHandler {
let scope = &mut unsafe { v8::CallbackScope::new(&message) };
let message = message.get_value().map_or_else(
|| "Unknown promise rejected error".to_owned(),
|value| {
extract_v8_string(value, scope)
.unwrap_or_else(|_| "Failed to extract promise reject message".to_owned())
},
);

let isolate = Isolate::state(scope);
let mut state = isolate.borrow_mut();
state.promise_rejected_message = Some(message);
}
}

// We don't allow imports at all, so we return None and throw an error
// so it can be catched later. As the error message suggests, all code
// should be bundled into a single file.
Expand Down Expand Up @@ -72,6 +88,7 @@ struct IsolateState {
handler_result: Option<v8::Global<v8::Promise>>,
stream_sender: flume::Sender<StreamResult>,
metadata: Rc<Metadata>,
promise_rejected_message: Option<String>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -198,6 +215,9 @@ impl Isolate {
// }

let mut isolate = v8::Isolate::new(params);
isolate.set_capture_stack_trace_for_uncaught_exceptions(true, 4);
isolate.set_promise_reject_callback(promise_reject_callback);

let (stream_sender, stream_receiver) = flume::unbounded();

let state: IsolateState = {
Expand All @@ -211,6 +231,7 @@ impl Isolate {
handler_result: None,
stream_sender,
metadata: Rc::clone(&options.metadata),
promise_rejected_message: None,
}
};

Expand Down Expand Up @@ -374,9 +395,6 @@ impl Isolate {

if !isolate_state.promises.is_empty() {
promises = Some(Vec::new());
}

if !isolate_state.promises.is_empty() {
self.running_promises.store(true, Ordering::SeqCst);

while let Poll::Ready(Some(BindingResult { id, result })) =
Expand Down Expand Up @@ -447,21 +465,31 @@ impl Isolate {
self.resolve_promises(cx);
self.poll_stream(tx);

// Handle termination results like timeouts and memory limit before
// checking the streaming status and promise state.
if let Ok(run_result) = self.termination_rx.as_ref().unwrap().try_recv() {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}

let isolate_state = Isolate::state(&self.isolate);
let state = isolate_state.borrow();

if let Some(promise_rejected_message) = &state.promise_rejected_message {
tx.send(RunResult::Error(promise_rejected_message.to_string()))
.unwrap_or(());
return Poll::Ready(());
}

if self.stream_response_sent {
if self.stream_status.is_done() {
return Poll::Ready(());
}

cx.waker().wake_by_ref();
return Poll::Pending;
}

if let Ok(run_result) = self.termination_rx.as_ref().unwrap().try_recv() {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}

let isolate_state = Isolate::state(&self.isolate);
let state = isolate_state.borrow();
let global = state.global.0.clone();
let scope = &mut v8::HandleScope::with_context(&mut self.isolate, global);
let try_catch = &mut v8::TryCatch::new(scope);
Expand All @@ -470,7 +498,7 @@ impl Isolate {
let promise = promise.open(try_catch);

match promise.state() {
PromiseState::Fulfilled => {
v8::PromiseState::Fulfilled => {
let response = promise.result(try_catch);

let run_result = match Response::from_v8(try_catch, response) {
Expand All @@ -490,6 +518,7 @@ impl Isolate {
return if self.stream_status.is_done() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
};
}
Expand All @@ -498,7 +527,7 @@ impl Isolate {
tx.send(run_result).unwrap_or(());
return Poll::Ready(());
}
PromiseState::Rejected => {
v8::PromiseState::Rejected => {
let exception = promise.result(try_catch);

tx.send(RunResult::Error(get_exception_message(
Expand All @@ -507,7 +536,7 @@ impl Isolate {
.unwrap_or(());
return Poll::Ready(());
}
PromiseState::Pending => {}
v8::PromiseState::Pending => {}
};
}

Expand Down
8 changes: 6 additions & 2 deletions packages/runtime/tests/disallow_codegen.rs
Expand Up @@ -29,7 +29,9 @@ async fn disallow_eval() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught EvalError: Code generation from strings disallowed for this context, at:\n const result = eval('1 + 1')".into())
RunResult::Error(
"EvalError: Code generation from strings disallowed for this context".into()
)
);
}

Expand All @@ -48,6 +50,8 @@ async fn disallow_function() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught EvalError: Code generation from strings disallowed for this context, at:\n const result = new Function('return 1 + 1')".into())
RunResult::Error(
"EvalError: Code generation from strings disallowed for this context".into()
)
);
}
6 changes: 3 additions & 3 deletions packages/runtime/tests/errors.rs
Expand Up @@ -24,7 +24,7 @@ async fn no_handler() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Handler function is not defined or is not a function, at:\n throw new Error(\"Handler function is not defined or is not a function\");".into())
RunResult::Error("Error: Handler function is not defined or is not a function".into())
);
}

Expand All @@ -37,7 +37,7 @@ async fn handler_not_function() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Handler function is not defined or is not a function, at:\n throw new Error(\"Handler function is not defined or is not a function\");".into())
RunResult::Error("Error: Handler function is not defined or is not a function".into())
);
}

Expand All @@ -55,7 +55,7 @@ async fn handler_reject() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("Uncaught Error: Rejected, at:\n throw new Error('Rejected');".into())
RunResult::Error("Error: Rejected".into())
);
}

Expand Down
10 changes: 2 additions & 8 deletions packages/runtime/tests/fetch.rs
Expand Up @@ -326,10 +326,7 @@ async fn throw_invalid_url() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error(
"Uncaught Error: client requires absolute-form URIs, at:\n throw new Error(error);"
.into()
)
RunResult::Error("Error: client requires absolute-form URIs".into())
);
}

Expand All @@ -354,10 +351,7 @@ async fn throw_invalid_header() {

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error(
"Uncaught Error: failed to parse header value, at:\n throw new Error(error);"
.into()
)
RunResult::Error("Error: failed to parse header value".into())
);
}

Expand Down
94 changes: 94 additions & 0 deletions packages/runtime/tests/streams.rs
Expand Up @@ -244,3 +244,97 @@ async fn response_before_write() {
);
assert!(rx.recv_async().await.is_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn timeout_infinite_streaming() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const { readable } = new TransformStream()

return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Stream(StreamResult::Start(Response::from(
"[object ReadableStream]"
)))
);
assert_eq!(rx.recv_async().await.unwrap(), RunResult::Timeout);
}

#[tokio::test(flavor = "multi_thread")]
async fn promise_reject_callback() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const { readable } = new TransformStream()

async function trigger() {
doesNotExists();
}

trigger();

return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("ReferenceError: doesNotExists is not defined".to_owned())
);
assert!(rx.recv_async().await.is_err());
}

#[tokio::test(flavor = "multi_thread")]
async fn promise_reject_callback_after_response() {
setup();
let mut isolate = Isolate::new(IsolateOptions::new(
"export function handler() {
const output = new TextEncoder().encode('This is rendered as binary stream with non-ASCII chars 😊');

const { readable, writable } = new TransformStream();

async function stream() {
// Just to delay a bit
await fetch('https://google.com');

const writer = writable.getWriter();
for (let i = 0; i < output.length; i++) {
await new Promise(resolve => {
doesNotExists(resolve, 0);
});
writer.write(new Uint8Array([output[i]]));
}
}

stream();

return new Response(readable);
}"
.to_owned(),
));
let (tx, rx) = flume::unbounded();
isolate.run(Request::default(), tx).await;

assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Stream(StreamResult::Start(Response::from(
"[object ReadableStream]"
)))
);
assert_eq!(
rx.recv_async().await.unwrap(),
RunResult::Error("ReferenceError: doesNotExists is not defined".to_owned())
);
assert!(rx.recv_async().await.is_err());
}