From 8ad5e8c69664aba928110f9d9f8d1ae6b193049b Mon Sep 17 00:00:00 2001 From: Lakshan Perera Date: Tue, 4 Apr 2023 00:55:38 +1000 Subject: [PATCH] fix: process request body streams --- README.md | 2 +- base/src/js_worker/js/user_workers.js | 13 +- base/src/js_worker/user_workers.rs | 174 +++++++++++++++++++++++--- examples/foo/index.ts | 2 +- 4 files changed, 167 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index da569bfde..c19af4e41 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Edge Runtime -A server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services. +A web server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services. ### Why are we building this? diff --git a/base/src/js_worker/js/user_workers.js b/base/src/js_worker/js/user_workers.js index b54ceaa64..5873b956d 100644 --- a/base/src/js_worker/js/user_workers.js +++ b/base/src/js_worker/js/user_workers.js @@ -2,7 +2,7 @@ ((window) => { const { TypeError } = window.__bootstrap.primordials; - const { readableStreamForRid } = window.__bootstrap.streams; + const { readableStreamForRid, writableStreamForRid } = window.__bootstrap.streams; const core = window.Deno.core; const ops = core.ops; @@ -24,7 +24,16 @@ hasBody, }; - const res = await core.opAsync("op_user_worker_fetch", this.key, userWorkerReq); + const { requestRid, requestBodyRid } = await ops.op_user_worker_fetch_build(userWorkerReq); + + // stream the request body + if (hasBody) { + let writableStream = writableStreamForRid(requestBodyRid); + body.pipeTo(writableStream); + } + + const res = await core.opAsync("op_user_worker_fetch_send", this.key, requestRid); + const bodyStream = readableStreamForRid(res.bodyRid); return new Response(bodyStream, { headers: res.headers, diff --git a/base/src/js_worker/user_workers.rs b/base/src/js_worker/user_workers.rs index 0c0c14677..2272be5ee 100644 --- a/base/src/js_worker/user_workers.rs +++ b/base/src/js_worker/user_workers.rs @@ -7,12 +7,13 @@ use deno_core::futures::StreamExt; use deno_core::include_js_files; use deno_core::op; use deno_core::AsyncRefCell; -use deno_core::CancelTryFuture; use deno_core::Extension; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::WriteOutcome; use deno_core::{AsyncResult, BufView, ByteString, CancelHandle, RcRef}; +use deno_core::{CancelFuture, CancelTryFuture}; use hyper::body::HttpBody; use hyper::header::{HeaderName, HeaderValue}; use hyper::{Body, Request, Response}; @@ -23,6 +24,8 @@ use std::collections::HashMap; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; use tokio::sync::{mpsc, oneshot}; pub fn init() -> Extension { @@ -33,7 +36,8 @@ pub fn init() -> Extension { )) .ops(vec![ op_user_worker_create::decl(), - op_user_worker_fetch::decl(), + op_user_worker_fetch_build::decl(), + op_user_worker_fetch_send::decl(), ]) .build() } @@ -88,6 +92,13 @@ pub struct UserWorkerRequest { has_body: bool, } +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UserWorkerBuiltRequest { + request_rid: ResourceId, + request_body_rid: Option, +} + #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct UserWorkerResponse { @@ -97,14 +108,63 @@ pub struct UserWorkerResponse { body_rid: ResourceId, } -type BytesStream = Pin> + Unpin>>; +struct UserWorkerRequestResource(Request); -struct UserWorkerRquestBodyResource { - writer: AsyncRefCell>, +impl Resource for UserWorkerRequestResource { + fn name(&self) -> Cow { + "userWorkerRequest".into() + } +} + +struct UserWorkerRequestBodyResource { + body: AsyncRefCell>>, cancel: CancelHandle, - size: Option, } +impl Resource for UserWorkerRequestBodyResource { + fn name(&self) -> Cow { + "userWorkerRequestBody".into() + } + + fn write(self: Rc, buf: BufView) -> AsyncResult { + Box::pin(async move { + let bytes: bytes::Bytes = buf.into(); + let nwritten = bytes.len(); + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + body.send(Some(bytes)) + .or_cancel(cancel) + .await? + .map_err(|_| type_error("request body receiver not connected (request closed)"))?; + Ok(WriteOutcome::Full { nwritten }) + }) + } + + fn shutdown(self: Rc) -> AsyncResult<()> { + Box::pin(async move { + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + // There is a case where hyper knows the size of the response body up + // front (through content-length header on the resp), where it will drop + // the body once that content length has been reached, regardless of if + // the stream is complete or not. This is expected behaviour, but it means + // that if you stream a body with an up front known size (eg a Blob), + // explicit shutdown can never succeed because the body (and by extension + // the receiver) will have dropped by the time we try to shutdown. As such + // we ignore if the receiver is closed, because we know that the request + // is complete in good health in that case. + body.send(None).or_cancel(cancel).await?.ok(); + Ok(()) + }) + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +type BytesStream = Pin> + Unpin>>; + struct UserWorkerResponseBodyResource { reader: AsyncRefCell>, cancel: CancelHandle, @@ -159,18 +219,20 @@ impl Resource for UserWorkerResponseBodyResource { } #[op] -pub async fn op_user_worker_fetch( - state: Rc>, - key: String, +pub fn op_user_worker_fetch_build( + state: &mut OpState, req: UserWorkerRequest, -) -> Result { - let mut op_state = state.borrow_mut(); - let tx = op_state.borrow::>(); - let (result_tx, result_rx) = oneshot::channel::>(); - +) -> Result { let mut body = Body::empty(); + let mut request_body_rid = None; if req.has_body { - //body = Body::wrap_stream(stream); + let (stream, tx) = MpscByteStream::new(); + body = Body::wrap_stream(stream); + + request_body_rid = Some(state.resource_table.add(UserWorkerRequestBodyResource { + body: AsyncRefCell::new(tx), + cancel: CancelHandle::default(), + })); } let mut request = Request::builder() @@ -183,18 +245,47 @@ pub async fn op_user_worker_fetch( for (key, value) in req.headers { if !key.is_empty() { let header_name = HeaderName::try_from(key).unwrap(); - let header_value = HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static("")); + let mut header_value = + HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static("")); - // skip content-length header, this would be re-calculated and added to the request - if header_name.eq("content-length") { - continue; + // if request has no body explicitly set the content-length to 0 + if !req.has_body && header_name.eq("content-length") { + header_value = HeaderValue::from(0); } request.headers_mut().append(header_name, header_value); } } - tx.send(UserWorkerMsgs::SendRequest(key, request, result_tx)); + let request_rid = state.resource_table.add(UserWorkerRequestResource(request)); + + Ok(UserWorkerBuiltRequest { + request_rid, + request_body_rid, + }) +} + +#[op] +pub async fn op_user_worker_fetch_send( + state: Rc>, + key: String, + rid: ResourceId, +) -> Result { + let mut op_state = state.borrow_mut(); + let tx = op_state + .borrow::>() + .clone(); + + let request = op_state + .resource_table + .take::(rid)?; + drop(op_state); + + let request = Rc::try_unwrap(request) + .ok() + .expect("multiple op_user_worker_fetch_send ongoing"); + let (result_tx, result_rx) = oneshot::channel::>(); + tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx)); let result = result_rx.await; if result.is_err() { @@ -227,6 +318,8 @@ pub async fn op_user_worker_fetch( .into_body() .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))), ); + + let mut op_state = state.borrow_mut(); let body_rid = op_state.resource_table.add(UserWorkerResponseBodyResource { reader: AsyncRefCell::new(stream.peekable()), cancel: CancelHandle::default(), @@ -241,3 +334,44 @@ pub async fn op_user_worker_fetch( }; Ok(response) } + +// [copied from https://github.com/denoland/deno/blob/v1.31.3/ext/fetch/byte_stream.rs] +// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is +// used to bridge between the fetch task and the HTTP body stream. The stream +// has the special property that it errors if the channel is closed before an +// explicit EOF is sent (in the form of a [None] value on the sender). +pub struct MpscByteStream { + receiver: mpsc::Receiver>, + shutdown: bool, +} + +impl MpscByteStream { + pub fn new() -> (Self, mpsc::Sender>) { + let (sender, receiver) = mpsc::channel::>(1); + let this = Self { + receiver, + shutdown: false, + }; + (this, sender) + } +} + +impl Stream for MpscByteStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let val = std::task::ready!(self.receiver.poll_recv(cx)); + match val { + None if self.shutdown => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "channel closed", + )))), + Some(None) => { + self.shutdown = true; + Poll::Ready(None) + } + Some(Some(val)) => Poll::Ready(Some(Ok(val))), + } + } +} diff --git a/examples/foo/index.ts b/examples/foo/index.ts index 950b8888c..557f82c27 100644 --- a/examples/foo/index.ts +++ b/examples/foo/index.ts @@ -7,7 +7,7 @@ interface reqPayload { console.log('server started modified'); serve(async (req: Request) => { - const { name } : reqPayload = await req.json() + const { name } : reqPayload = await req.json(); const data = { message: `Hello ${name} from foo!`, test: 'foo'