Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ impl Service<Request<Body>> for WorkerService {
}
};

// If the token has already been canceled, drop the socket connection
// without sending a response.
if cancel.is_cancelled() {
error!("connection aborted (uri: {:?})", req_uri.to_string());
return Err(anyhow!(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"connection aborted"
)));
}

let res = match res {
Ok(res) => {
let (parts, body) = res.into_parts();
Expand Down
7 changes: 7 additions & 0 deletions crates/base/test_cases/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ Deno.serve(async (req: Request) => {
} catch (e) {
console.error(e);

if (e instanceof Deno.errors.InvalidWorkerResponse) {
if (e.message === "connection closed before message completed") {
// Give up on handling response and close http connection
return;
}
}

// if (e instanceof Deno.errors.WorkerRequestCancelled) {
// return await callWorker();
// }
Expand Down
5 changes: 5 additions & 0 deletions crates/base/test_cases/return-invalid-resp-2/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";

serve(() => {
return 1;
});
3 changes: 3 additions & 0 deletions crates/base/test_cases/return-invalid-resp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Deno.serve(() => {
return 1;
});
104 changes: 81 additions & 23 deletions crates/base/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::error::Error;
use std::io;
use std::io::BufRead;
use std::io::Cursor;
Expand Down Expand Up @@ -1250,21 +1251,25 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit() {
120 * MB,
None,
|resp| async {
let res = resp.unwrap();
if let Err(err) = resp {
assert_connection_aborted(err);
} else {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);
assert_eq!(res.status().as_u16(), 500);

let res = res.json::<ErrorResponsePayload>().await;
let res = res.json::<ErrorResponsePayload>().await;

assert!(res.is_ok());
assert!(res.is_ok());

let msg = res.unwrap().msg;
let msg = res.unwrap().msg;

assert!(
msg
== "WorkerRequestCancelled: request has been cancelled by supervisor"
|| msg == "broken pipe"
);
assert!(
msg
== "WorkerRequestCancelled: request has been cancelled by supervisor"
|| msg == "broken pipe"
);
}
},
)
.await;
Expand All @@ -1278,24 +1283,28 @@ async fn req_failure_case_op_cancel_from_server_due_to_cpu_resource_limit_2() {
10 * MB,
Some("image/png"),
|resp| async {
let res = resp.unwrap();
if let Err(err) = resp {
assert_connection_aborted(err);
} else {
let res = resp.unwrap();

assert_eq!(res.status().as_u16(), 500);
assert_eq!(res.status().as_u16(), 500);

let res = res.json::<ErrorResponsePayload>().await;
let res = res.json::<ErrorResponsePayload>().await;

assert!(res.is_ok());
assert!(res.is_ok());

let msg = res.unwrap().msg;
let msg = res.unwrap().msg;

assert!(
!msg.starts_with("TypeError: request body receiver not connected")
);
assert!(
msg
== "WorkerRequestCancelled: request has been cancelled by supervisor"
|| msg == "broken pipe"
);
assert!(
!msg.starts_with("TypeError: request body receiver not connected")
);
assert!(
msg
== "WorkerRequestCancelled: request has been cancelled by supervisor"
|| msg == "broken pipe"
);
}
},
)
.await;
Expand Down Expand Up @@ -4246,6 +4255,39 @@ async fn test_pin_package_version_correctly() {
);
}

#[tokio::test]
#[serial]
async fn test_drop_socket_when_http_handler_returns_an_invalid_value() {
{
integration_test!(
"./test_cases/main",
NON_SECURE_PORT,
"return-invalid-resp",
None,
None,
None,
(|resp| async {
assert_connection_aborted(resp.unwrap_err());
}),
TerminationToken::new()
);
}
{
integration_test!(
"./test_cases/main",
NON_SECURE_PORT,
"return-invalid-resp-2",
None,
None,
None,
(|resp| async {
assert_connection_aborted(resp.unwrap_err());
}),
TerminationToken::new()
);
}
}

#[derive(Deserialize)]
struct ErrorResponsePayload {
msg: String,
Expand Down Expand Up @@ -4382,3 +4424,19 @@ fn new_localhost_tls(secure: bool) -> Option<Tls> {
Tls::new(SECURE_PORT, TLS_LOCALHOST_KEY, TLS_LOCALHOST_CERT).unwrap()
})
}

fn assert_connection_aborted(err: reqwest::Error) {
let source = err.source();
let hyper_err = source
.and_then(|err| err.downcast_ref::<hyper::Error>())
.unwrap();

if hyper_err.is_incomplete_message() {
return;
}

let cause = hyper_err.source().unwrap();
let cause = cause.downcast_ref::<std::io::Error>().unwrap();

assert_eq!(cause.kind(), std::io::ErrorKind::ConnectionReset);
}
6 changes: 6 additions & 0 deletions examples/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ Deno.serve(async (req: Request) => {

// return await callWorker();
}
if (e instanceof Deno.errors.InvalidWorkerResponse) {
if (e.message === "connection closed before message completed") {
// Give up on handling response and close http connection
return;
}
}

const error = { msg: e.toString() };
return new Response(
Expand Down
19 changes: 16 additions & 3 deletions ext/runtime/conn_sync.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
use std::sync::Arc;

use deno_core::unsync::sync::AtomicFlag;
use deno_core::Resource;
use tokio_util::sync::CancellationToken;

pub struct ConnWatcher(pub Option<CancellationToken>);
pub struct ConnWatcher(pub Option<CancellationToken>, pub Arc<AtomicFlag>);

impl Drop for ConnWatcher {
fn drop(&mut self) {
if let Some(token) = self.0.as_ref() {
if !self.1.is_raised() {
token.cancel();
}
}
}
}

impl Resource for ConnWatcher {
fn name(&self) -> std::borrow::Cow<str> {
Expand All @@ -10,7 +23,7 @@ impl Resource for ConnWatcher {
}

impl ConnWatcher {
pub fn get(&self) -> Option<CancellationToken> {
self.0.clone()
pub fn into_inner(mut self) -> Option<CancellationToken> {
self.0.take()
}
}
3 changes: 2 additions & 1 deletion ext/runtime/js/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RequestPrototype } from "ext:deno_fetch/23_request.js";
import {
fromInnerResponse,
newInnerResponse,
ResponsePrototype,
} from "ext:deno_fetch/23_response.js";
import { upgradeWebSocket } from "ext:deno_http/02_websocket.ts";
import { HttpConn } from "ext:runtime/01_http.js";
Expand Down Expand Up @@ -245,7 +246,7 @@ async function respond(requestEvent, httpConn, options, snapshot) {
}
}

if (span) {
if (ObjectPrototypeIsPrototypeOf(ResponsePrototype, response) && span) {
updateSpanFromResponse(span, response);
}

Expand Down
18 changes: 16 additions & 2 deletions ext/runtime/ops/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Poll;

use anyhow::bail;
use anyhow::Context;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::unsync::sync::AtomicFlag;
use deno_core::ByteString;
use deno_core::OpState;
use deno_core::RcRef;
Expand Down Expand Up @@ -66,6 +68,7 @@ where
io: Option<(S, Option<CancellationToken>)>,
state: StreamState,
wait_fut: Option<BoxFuture<'static, ()>>,
pub written: Arc<AtomicFlag>,
}

impl<S> Drop for Stream2<S>
Expand Down Expand Up @@ -107,6 +110,7 @@ where
io: Some((stream, token)),
state: StreamState::Normal,
wait_fut: None,
written: Arc::default(),
}
}

Expand Down Expand Up @@ -145,8 +149,13 @@ where
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let written = self.written.clone();
if let Some((stream, _)) = Pin::into_inner(self).io.as_mut() {
Pin::new(stream).poll_write(cx, buf)
let ret = ready!(Pin::new(stream).poll_write(cx, buf));
if ret.is_ok() {
written.raise();
}
Poll::Ready(ret)
} else {
Poll::Ready(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))
}
Expand All @@ -157,8 +166,13 @@ where
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
let written = self.written.clone();
if let Some((stream, _)) = Pin::into_inner(self).io.as_mut() {
Pin::new(stream).poll_write_vectored(cx, bufs)
let ret = ready!(Pin::new(stream).poll_write_vectored(cx, bufs));
if ret.is_ok() {
written.raise();
}
Poll::Ready(ret)
} else {
Poll::Ready(Err(std::io::Error::from(std::io::ErrorKind::BrokenPipe)))
}
Expand Down
12 changes: 4 additions & 8 deletions ext/runtime/ops/http_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,10 @@ fn op_http_start(

// set a hardcoded address
let addr: std::net::SocketAddr = "0.0.0.0:9999".parse().unwrap();
let conn = http_create_conn_resource(
state,
DuplexStream2::new(stream, token.clone()),
addr,
"http",
);

let conn_watcher = state.resource_table.add(ConnWatcher(token));
let stream = DuplexStream2::new(stream, token.clone());
let written = stream.written.clone();
let conn = http_create_conn_resource(state, stream, addr, "http");
let conn_watcher = state.resource_table.add(ConnWatcher(token, written));

return Ok((conn, conn_watcher));
}
Expand Down
2 changes: 1 addition & 1 deletion ext/workers/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ pub async fn op_user_worker_fetch_send(
.map(Rc::try_unwrap);

let conn_token = match conn_token {
Some(Ok(it)) => it.get(),
Some(Ok(it)) => it.into_inner(),
Some(Err(_)) => {
error!("failed to unwrap connection watcher");
None
Expand Down