Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Edge Runtime

A server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services.
A web server based on [Deno](https://deno.land) runtime, capable of running JavaScript, TypeScript, and WASM services.

### Why are we building this?

Expand Down
13 changes: 11 additions & 2 deletions base/src/js_worker/js/user_workers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

((window) => {
const { TypeError } = window.__bootstrap.primordials;
const { readableStreamForRid } = window.__bootstrap.streams;
const { readableStreamForRid, writableStreamForRid } = window.__bootstrap.streams;
const core = window.Deno.core;
const ops = core.ops;

Expand All @@ -24,7 +24,16 @@
hasBody,
};

const res = await core.opAsync("op_user_worker_fetch", this.key, userWorkerReq);
const { requestRid, requestBodyRid } = await ops.op_user_worker_fetch_build(userWorkerReq);

// stream the request body
if (hasBody) {
let writableStream = writableStreamForRid(requestBodyRid);
body.pipeTo(writableStream);
}

const res = await core.opAsync("op_user_worker_fetch_send", this.key, requestRid);

const bodyStream = readableStreamForRid(res.bodyRid);
return new Response(bodyStream, {
headers: res.headers,
Expand Down
174 changes: 154 additions & 20 deletions base/src/js_worker/user_workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use deno_core::futures::StreamExt;
use deno_core::include_js_files;
use deno_core::op;
use deno_core::AsyncRefCell;
use deno_core::CancelTryFuture;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::WriteOutcome;
use deno_core::{AsyncResult, BufView, ByteString, CancelHandle, RcRef};
use deno_core::{CancelFuture, CancelTryFuture};
use hyper::body::HttpBody;
use hyper::header::{HeaderName, HeaderValue};
use hyper::{Body, Request, Response};
Expand All @@ -23,6 +24,8 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::sync::{mpsc, oneshot};

pub fn init() -> Extension {
Expand All @@ -33,7 +36,8 @@ pub fn init() -> Extension {
))
.ops(vec![
op_user_worker_create::decl(),
op_user_worker_fetch::decl(),
op_user_worker_fetch_build::decl(),
op_user_worker_fetch_send::decl(),
])
.build()
}
Expand Down Expand Up @@ -88,6 +92,13 @@ pub struct UserWorkerRequest {
has_body: bool,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UserWorkerBuiltRequest {
request_rid: ResourceId,
request_body_rid: Option<ResourceId>,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct UserWorkerResponse {
Expand All @@ -97,14 +108,63 @@ pub struct UserWorkerResponse {
body_rid: ResourceId,
}

type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
struct UserWorkerRequestResource(Request<Body>);

struct UserWorkerRquestBodyResource {
writer: AsyncRefCell<Peekable<BytesStream>>,
impl Resource for UserWorkerRequestResource {
fn name(&self) -> Cow<str> {
"userWorkerRequest".into()
}
}

struct UserWorkerRequestBodyResource {
body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
cancel: CancelHandle,
size: Option<u64>,
}

impl Resource for UserWorkerRequestBodyResource {
fn name(&self) -> Cow<str> {
"userWorkerRequestBody".into()
}

fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> {
Box::pin(async move {
let bytes: bytes::Bytes = buf.into();
let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
body.send(Some(bytes))
.or_cancel(cancel)
.await?
.map_err(|_| type_error("request body receiver not connected (request closed)"))?;
Ok(WriteOutcome::Full { nwritten })
})
}

fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(async move {
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
// There is a case where hyper knows the size of the response body up
// front (through content-length header on the resp), where it will drop
// the body once that content length has been reached, regardless of if
// the stream is complete or not. This is expected behaviour, but it means
// that if you stream a body with an up front known size (eg a Blob),
// explicit shutdown can never succeed because the body (and by extension
// the receiver) will have dropped by the time we try to shutdown. As such
// we ignore if the receiver is closed, because we know that the request
// is complete in good health in that case.
body.send(None).or_cancel(cancel).await?.ok();
Ok(())
})
}

fn close(self: Rc<Self>) {
self.cancel.cancel()
}
}

type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;

struct UserWorkerResponseBodyResource {
reader: AsyncRefCell<Peekable<BytesStream>>,
cancel: CancelHandle,
Expand Down Expand Up @@ -159,18 +219,20 @@ impl Resource for UserWorkerResponseBodyResource {
}

#[op]
pub async fn op_user_worker_fetch(
state: Rc<RefCell<OpState>>,
key: String,
pub fn op_user_worker_fetch_build(
state: &mut OpState,
req: UserWorkerRequest,
) -> Result<UserWorkerResponse, AnyError> {
let mut op_state = state.borrow_mut();
let tx = op_state.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>();
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();

) -> Result<UserWorkerBuiltRequest, AnyError> {
let mut body = Body::empty();
let mut request_body_rid = None;
if req.has_body {
//body = Body::wrap_stream(stream);
let (stream, tx) = MpscByteStream::new();
body = Body::wrap_stream(stream);

request_body_rid = Some(state.resource_table.add(UserWorkerRequestBodyResource {
body: AsyncRefCell::new(tx),
cancel: CancelHandle::default(),
}));
}

let mut request = Request::builder()
Expand All @@ -183,18 +245,47 @@ pub async fn op_user_worker_fetch(
for (key, value) in req.headers {
if !key.is_empty() {
let header_name = HeaderName::try_from(key).unwrap();
let header_value = HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static(""));
let mut header_value =
HeaderValue::try_from(value).unwrap_or(HeaderValue::from_static(""));

// skip content-length header, this would be re-calculated and added to the request
if header_name.eq("content-length") {
continue;
// if request has no body explicitly set the content-length to 0
if !req.has_body && header_name.eq("content-length") {
header_value = HeaderValue::from(0);
}

request.headers_mut().append(header_name, header_value);
}
}

tx.send(UserWorkerMsgs::SendRequest(key, request, result_tx));
let request_rid = state.resource_table.add(UserWorkerRequestResource(request));

Ok(UserWorkerBuiltRequest {
request_rid,
request_body_rid,
})
}

#[op]
pub async fn op_user_worker_fetch_send(
state: Rc<RefCell<OpState>>,
key: String,
rid: ResourceId,
) -> Result<UserWorkerResponse, AnyError> {
let mut op_state = state.borrow_mut();
let tx = op_state
.borrow::<mpsc::UnboundedSender<UserWorkerMsgs>>()
.clone();

let request = op_state
.resource_table
.take::<UserWorkerRequestResource>(rid)?;
drop(op_state);

let request = Rc::try_unwrap(request)
.ok()
.expect("multiple op_user_worker_fetch_send ongoing");
let (result_tx, result_rx) = oneshot::channel::<Response<Body>>();
tx.send(UserWorkerMsgs::SendRequest(key, request.0, result_tx));

let result = result_rx.await;
if result.is_err() {
Expand Down Expand Up @@ -227,6 +318,8 @@ pub async fn op_user_worker_fetch(
.into_body()
.map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))),
);

let mut op_state = state.borrow_mut();
let body_rid = op_state.resource_table.add(UserWorkerResponseBodyResource {
reader: AsyncRefCell::new(stream.peekable()),
cancel: CancelHandle::default(),
Expand All @@ -241,3 +334,44 @@ pub async fn op_user_worker_fetch(
};
Ok(response)
}

// [copied from https://github.com/denoland/deno/blob/v1.31.3/ext/fetch/byte_stream.rs]
// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
// used to bridge between the fetch task and the HTTP body stream. The stream
// has the special property that it errors if the channel is closed before an
// explicit EOF is sent (in the form of a [None] value on the sender).
pub struct MpscByteStream {
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
shutdown: bool,
}

impl MpscByteStream {
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
let this = Self {
receiver,
shutdown: false,
};
(this, sender)
}
}

impl Stream for MpscByteStream {
type Item = Result<bytes::Bytes, std::io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let val = std::task::ready!(self.receiver.poll_recv(cx));
match val {
None if self.shutdown => Poll::Ready(None),
None => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"channel closed",
)))),
Some(None) => {
self.shutdown = true;
Poll::Ready(None)
}
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
}
}
}
2 changes: 1 addition & 1 deletion examples/foo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ interface reqPayload {
console.log('server started modified');

serve(async (req: Request) => {
const { name } : reqPayload = await req.json()
const { name } : reqPayload = await req.json();
const data = {
message: `Hello ${name} from foo!`,
test: 'foo'
Expand Down