From 26ed99f6ea8aaddde89e079f871fdaacd2add33f Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 15 Feb 2024 01:08:11 +0000 Subject: [PATCH 01/12] stamp(sb_workers): polishing --- crates/sb_workers/user_workers.js | 27 ++++++--------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/crates/sb_workers/user_workers.js b/crates/sb_workers/user_workers.js index 72bc701a9..bf4159fe1 100644 --- a/crates/sb_workers/user_workers.js +++ b/crates/sb_workers/user_workers.js @@ -1,28 +1,18 @@ import { primordials, core } from "ext:core/mod.js"; -const { - TypeError, -} = primordials; - import { readableStreamForRid, writableStreamForRid } from 'ext:deno_web/06_streams.js'; import { getWatcherRid } from 'ext:sb_core_main_js/js/http.js'; + +const NO_CONN_WATCHER_WARN_MSG = `Unable to find the connection watcher from the request instance.\n\ +Invoke \`EdgeRuntime.applyConnectionWatcher(origReq, newReq)\` if you have cloned the original request.` + const ops = core.ops; +const { TypeError } = primordials; const { op_user_worker_fetch_send, op_user_worker_create } = core.ensureFastOps(); -// interface WorkerOptions { -// servicePath: string; -// memoryLimitMb?: number; -// workerTimeoutMs?: number; -// noModuleCache?: boolean; -// importMapPath?: string; -// envVars?: Array -// } - -const chunkExpression = /(?:^|\W)chunked(?:$|\W)/i; - function nullBodyStatus(status) { return status === 101 || status === 204 || status === 205 || status === 304; } @@ -46,17 +36,12 @@ class UserWorker { signal?.throwIfAborted(); if (watcherRid === void 0) { - console.warn(`Unable to find the connection watcher from the request instance.\n\ -Invoke \`EdgeRuntime.applyConnectionWatcher(origReq, newReq)\` if you have cloned the original request.`); + console.warn(NO_CONN_WATCHER_WARN_MSG); } const headersArray = Array.from(headers.entries()); const hasReqBody = !bodyUsed && !!body; - // const hasReqBody = !bodyUsed && !!body && - // (chunkExpression.test(headers.get('transfer-encoding')) || - // Number.parseInt(headers.get('content-length'), 10) > 0); - const userWorkerReq = { method, url, From 640ddc482986140dc7ce72a4bd50cfe39a52b940 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 18 Feb 2024 00:34:12 +0000 Subject: [PATCH 02/12] stamp: add `http_utils` crate --- crates/http_utils/Cargo.toml | 12 ++++ crates/http_utils/src/io/mod.rs | 5 ++ crates/http_utils/src/io/rewind.rs | 84 +++++++++++++++++++++++++++ crates/http_utils/src/io/upgraded2.rs | 75 ++++++++++++++++++++++++ crates/http_utils/src/lib.rs | 2 + crates/http_utils/src/utils.rs | 29 +++++++++ 6 files changed, 207 insertions(+) create mode 100644 crates/http_utils/Cargo.toml create mode 100644 crates/http_utils/src/io/mod.rs create mode 100644 crates/http_utils/src/io/rewind.rs create mode 100644 crates/http_utils/src/io/upgraded2.rs create mode 100644 crates/http_utils/src/lib.rs create mode 100644 crates/http_utils/src/utils.rs diff --git a/crates/http_utils/Cargo.toml b/crates/http_utils/Cargo.toml new file mode 100644 index 000000000..fbf4e67ff --- /dev/null +++ b/crates/http_utils/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "http_utils" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true } +tokio-util = { workspace = true, features = ["rt"] } +futures-util = { workspace = true } +bytes = { workspace = true } +hyper = { workspace = true, features = ["full"] } +http = { version = "0.2" } \ No newline at end of file diff --git a/crates/http_utils/src/io/mod.rs b/crates/http_utils/src/io/mod.rs new file mode 100644 index 000000000..fd99c30cd --- /dev/null +++ b/crates/http_utils/src/io/mod.rs @@ -0,0 +1,5 @@ +mod rewind; +mod upgraded2; + +pub use self::rewind::Rewind; +pub use self::upgraded2::Upgraded2; diff --git a/crates/http_utils/src/io/rewind.rs b/crates/http_utils/src/io/rewind.rs new file mode 100644 index 000000000..7940594a1 --- /dev/null +++ b/crates/http_utils/src/io/rewind.rs @@ -0,0 +1,84 @@ +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{cmp, io}; + +use bytes::{Buf, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +/// Combine a buffer with an IO, rewinding reads to use the buffer. +#[derive(Debug)] +pub struct Rewind { + pre: Option, + inner: T, +} + +impl Rewind { + pub fn new_buffered(io: T, buf: Bytes) -> Self { + Rewind { + pre: Some(buf), + inner: io, + } + } +} + +impl AsyncRead for Rewind +where + T: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); + // Put back what's left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(())); + } + } + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Rewind +where + T: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} diff --git a/crates/http_utils/src/io/upgraded2.rs b/crates/http_utils/src/io/upgraded2.rs new file mode 100644 index 000000000..155ec01e6 --- /dev/null +++ b/crates/http_utils/src/io/upgraded2.rs @@ -0,0 +1,75 @@ +use std::{ + fmt, io, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use super::Rewind; + +trait Io: AsyncRead + AsyncWrite + Unpin + 'static {} + +impl Io for T {} + +pub struct Upgraded2 { + io: Rewind>, +} + +impl Upgraded2 { + pub fn new(io: T, read_buf: Bytes) -> Self + where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + Upgraded2 { + io: Rewind::new_buffered(Box::new(io), read_buf), + } + } +} + +impl AsyncRead for Upgraded2 { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.io).poll_read(cx, buf) + } +} + +impl AsyncWrite for Upgraded2 { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.io).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.io).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.io).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.io).poll_shutdown(cx) + } + + fn is_write_vectored(&self) -> bool { + self.io.is_write_vectored() + } +} + +impl fmt::Debug for Upgraded2 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Upgraded2").finish() + } +} diff --git a/crates/http_utils/src/lib.rs b/crates/http_utils/src/lib.rs new file mode 100644 index 000000000..94db7d321 --- /dev/null +++ b/crates/http_utils/src/lib.rs @@ -0,0 +1,2 @@ +pub mod io; +pub mod utils; diff --git a/crates/http_utils/src/utils.rs b/crates/http_utils/src/utils.rs new file mode 100644 index 000000000..e15bf3a36 --- /dev/null +++ b/crates/http_utils/src/utils.rs @@ -0,0 +1,29 @@ +use http::{header, response, HeaderMap, Response, StatusCode}; +use hyper::Body; + +pub fn get_upgrade_type(headers: &HeaderMap) -> Option { + let connection_header_exists = headers + .get(header::CONNECTION) + .map(|it| { + it.to_str() + .unwrap_or("") + .split(',') + .any(|str| str.trim() == header::UPGRADE) + }) + .unwrap_or(false); + + if connection_header_exists { + if let Some(upgrade) = headers.get(header::UPGRADE) { + return upgrade.to_str().ok().map(str::to_owned); + } + } + + None +} + +pub fn emit_status_code(status: StatusCode) -> Response { + response::Builder::new() + .status(status) + .body(Body::empty()) + .unwrap() +} From 5b955a42afccd50998bdf0abd010d66e852eb95f Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 18 Feb 2024 00:34:32 +0000 Subject: [PATCH 03/12] stamp: update dependencies --- crates/base/Cargo.toml | 3 ++- crates/sb_workers/Cargo.toml | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index 7d35f6e1b..d172140a4 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +http_utils = { version = "0.1.0", path = "../http_utils" } async-trait.workspace = true thiserror.workspace = true monch.workspace = true @@ -31,7 +32,7 @@ deno_webidl = { workspace = true } deno_web = { workspace = true } deno_websocket = { workspace = true } httparse = { version = "1.8.0" } -hyper = { workspace = true, features = ["full"] } +hyper = { workspace = true, features = ["full", "backports"] } http = { version = "0.2" } import_map.workspace = true log = { workspace = true } diff --git a/crates/sb_workers/Cargo.toml b/crates/sb_workers/Cargo.toml index db756e863..76d2f5184 100644 --- a/crates/sb_workers/Cargo.toml +++ b/crates/sb_workers/Cargo.toml @@ -23,6 +23,7 @@ log.workspace = true enum-as-inner.workspace = true futures-util.workspace = true thiserror.workspace = true +http_utils = { version = "0.1.0", path = "../http_utils" } event_worker = { version = "0.1.0", path = "../event_worker" } sb_graph = { version = "0.1.0", path = "../sb_graph" } sb_core = { version = "0.1.0", path = "../sb_core" } From 16218b4fecc9a545995d3f240e0eb800baf02cc8 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 18 Feb 2024 00:59:41 +0000 Subject: [PATCH 04/12] stamp: update `Cargo.lock` --- Cargo.lock | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 106e29be8..2a9678498 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -351,6 +351,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "async-tungstenite", "bytes", "cityhash", "cpu_timer", @@ -380,6 +381,7 @@ dependencies = [ "flume", "futures-util", "http 0.2.11", + "http_utils", "httparse", "hyper 0.14.28", "import_map", @@ -2617,6 +2619,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http_utils" +version = "0.1.0" +dependencies = [ + "bytes", + "futures-util", + "http 0.2.11", + "hyper 0.14.28", + "tokio", + "tokio-util", +] + [[package]] name = "httparse" version = "1.8.0" @@ -4795,6 +4809,7 @@ dependencies = [ "enum-as-inner 0.6.0", "event_worker", "futures-util", + "http_utils", "hyper 0.14.28", "log", "sb_core", From 2bc8854b41fb184ce6346f975077c7f961075b40 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 18 Feb 2024 01:05:59 +0000 Subject: [PATCH 05/12] refactor: remove watcher api and introduce supabase tag api --- crates/sb_core/js/01_http.js | 554 ++++++++++++++++++++++++++++++ crates/sb_core/js/http.js | 31 +- crates/sb_core/js/main_worker.js | 6 +- crates/sb_core/lib.rs | 1 + crates/sb_workers/lib.rs | 1 + crates/sb_workers/user_workers.js | 26 +- 6 files changed, 595 insertions(+), 24 deletions(-) create mode 100644 crates/sb_core/js/01_http.js diff --git a/crates/sb_core/js/01_http.js b/crates/sb_core/js/01_http.js new file mode 100644 index 000000000..187811f79 --- /dev/null +++ b/crates/sb_core/js/01_http.js @@ -0,0 +1,554 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { core, internals, primordials } from "ext:core/mod.js"; +const { + BadResourcePrototype, + InterruptedPrototype, +} = core; +import { + op_http_accept, + op_http_headers, + op_http_shutdown, + op_http_upgrade_websocket, + op_http_websocket_accept_header, + op_http_write, + op_http_write_headers, + op_http_write_resource, +} from "ext:core/ops"; +const { + ArrayPrototypeIncludes, + ArrayPrototypeMap, + ArrayPrototypePush, + ObjectPrototypeIsPrototypeOf, + SafeSet, + SafeSetIterator, + SetPrototypeAdd, + SetPrototypeDelete, + StringPrototypeCharCodeAt, + StringPrototypeIncludes, + StringPrototypeSplit, + StringPrototypeToLowerCase, + StringPrototypeToUpperCase, + Symbol, + SymbolAsyncIterator, + TypeError, + TypedArrayPrototypeGetSymbolToStringTag, + Uint8Array, +} = primordials; + +import { InnerBody } from "ext:deno_fetch/22_body.js"; +import { Event, setEventTargetData } from "ext:deno_web/02_event.js"; +import { BlobPrototype } from "ext:deno_web/09_file.js"; +import { + fromInnerResponse, + newInnerResponse, + ResponsePrototype, + toInnerResponse, +} from "ext:deno_fetch/23_response.js"; +import { + fromInnerRequest, + newInnerRequest, + toInnerRequest, +} from "ext:deno_fetch/23_request.js"; +import { AbortController } from "ext:deno_web/03_abort_signal.js"; +import { + _eventLoop, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _protocol, + _readyState, + _rid, + _role, + _server, + _serverHandleIdleTimeout, + createWebSocketBranded, + SERVER, + WebSocket, +} from "ext:deno_websocket/01_websocket.js"; +import { + getReadableStreamResourceBacking, + readableStreamClose, + readableStreamForRid, + ReadableStreamPrototype, +} from "ext:deno_web/06_streams.js"; +import { serve } from "ext:deno_http/00_serve.js"; +import { SymbolDispose } from "ext:deno_web/00_infra.js"; + +const connErrorSymbol = Symbol("connError"); + +/** @type {(self: HttpConn, rid: number) => boolean} */ +let deleteManagedResource; + +class HttpConn { + #rid = 0; + #closed = false; + #remoteAddr; + #localAddr; + + // This set holds resource ids of resources + // that were created during lifecycle of this request. + // When the connection is closed these resources should be closed + // as well. + #managedResources = new SafeSet(); + + static { + deleteManagedResource = (self, rid) => + SetPrototypeDelete(self.#managedResources, rid); + } + + constructor(rid, remoteAddr, localAddr) { + this.#rid = rid; + this.#remoteAddr = remoteAddr; + this.#localAddr = localAddr; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await op_http_accept(this.#rid); + } catch (error) { + this.close(); + // A connection error seen here would cause disrupted responses to throw + // a generic `BadResource` error. Instead store this error and replace + // those with it. + this[connErrorSymbol] = error; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) || + ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error) || + StringPrototypeIncludes(error.message, "connection closed") + ) { + return null; + } + throw error; + } + if (nextRequest == null) { + // Work-around for servers (deno_std/http in particular) that call + // `nextRequest()` before upgrading a previous request which has a + // `connection: upgrade` header. + await null; + + this.close(); + return null; + } + + const { 0: streamRid, 1: method, 2: url } = nextRequest; + SetPrototypeAdd(this.#managedResources, streamRid); + + /** @type {ReadableStream | undefined} */ + let body = null; + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = readableStreamForRid(streamRid, false); + } + + const innerRequest = newInnerRequest( + method, + url, + () => op_http_headers(streamRid), + body !== null ? new InnerBody(body) : null, + false, + ); + + const abortController = new AbortController(); + const request = fromInnerRequest( + innerRequest, + abortController.signal, + "immutable", + false, + ); + + const respondWith = createRespondWith( + this, + streamRid, + abortController, + ); + + return { request, respondWith, streamRid }; + } + + /** @returns {void} */ + close() { + if (!this.#closed) { + this.#closed = true; + core.close(this.#rid); + for (const rid of new SafeSetIterator(this.#managedResources)) { + SetPrototypeDelete(this.#managedResources, rid); + core.close(rid); + } + } + } + + [SymbolDispose]() { + core.tryClose(this.#rid); + for (const rid of new SafeSetIterator(this.#managedResources)) { + SetPrototypeDelete(this.#managedResources, rid); + core.tryClose(rid); + } + } + + [SymbolAsyncIterator]() { + // deno-lint-ignore no-this-alias + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + // Change with caution, current form avoids a v8 deopt + return { value: reqEvt ?? undefined, done: reqEvt === null }; + }, + }; + } +} + +function createRespondWith( + httpConn, + streamRid, + abortController, +) { + return async function respondWith(resp) { + try { + resp = await resp; + if (!(ObjectPrototypeIsPrototypeOf(ResponsePrototype, resp))) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if ( + ObjectPrototypeIsPrototypeOf( + ReadableStreamPrototype, + innerResp.body.streamOrStatic, + ) + ) { + if ( + innerResp.body.length === null || + ObjectPrototypeIsPrototypeOf( + BlobPrototype, + innerResp.body.source, + ) + ) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + const isStreamingResponseBody = !( + typeof respBody === "string" || + TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array" + ); + try { + await op_http_write_headers( + streamRid, + innerResp.status ?? 200, + innerResp.headerList, + isStreamingResponseBody ? null : respBody, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if ( + respBody !== null && + ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) + ) { + await respBody.cancel(error); + } + throw error; + } + + if (isStreamingResponseBody) { + let success = false; + if ( + respBody === null || + !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) + ) { + throw new TypeError("Unreachable"); + } + const resourceBacking = getReadableStreamResourceBacking(respBody); + let reader; + if (resourceBacking) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); + } + reader = respBody.getReader(); // Acquire JS lock. + try { + await op_http_write_resource( + streamRid, + resourceBacking.rid, + ); + if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); + readableStreamClose(respBody); // Release JS lock. + success = true; + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } else { + reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if ( + TypedArrayPrototypeGetSymbolToStringTag(value) !== "Uint8Array" + ) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await op_http_write(streamRid, value); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + success = true; + } + + if (success) { + try { + await op_http_shutdown(streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } + } + } + + const ws = resp[_ws]; + if (ws) { + const wsRid = await op_http_upgrade_websocket( + streamRid, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + httpConn.close(); + + ws[_readyState] = WebSocket.OPEN; + ws[_role] = SERVER; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } + } catch (error) { + abortController.abort(error); + throw error; + } finally { + if (deleteManagedResource(httpConn, streamRid)) { + core.close(streamRid); + } + } + }; +} + +const _ws = Symbol("[[associated_ws]]"); +const websocketCvf = buildCaseInsensitiveCommaValueFinder("websocket"); +const upgradeCvf = buildCaseInsensitiveCommaValueFinder("upgrade"); + +function upgradeWebSocket(request, options = {}) { + const inner = toInnerRequest(request); + const upgrade = request.headers.get("upgrade"); + const upgradeHasWebSocketOption = upgrade !== null && + websocketCvf(upgrade); + if (!upgradeHasWebSocketOption) { + throw new TypeError( + "Invalid Header: 'upgrade' header must contain 'websocket'", + ); + } + + const connection = request.headers.get("connection"); + const connectionHasUpgradeOption = connection !== null && + upgradeCvf(connection); + if (!connectionHasUpgradeOption) { + throw new TypeError( + "Invalid Header: 'connection' header must contain 'Upgrade'", + ); + } + + const websocketKey = request.headers.get("sec-websocket-key"); + if (websocketKey === null) { + throw new TypeError( + "Invalid Header: 'sec-websocket-key' header must be set", + ); + } + + const accept = op_http_websocket_accept_header(websocketKey); + + const r = newInnerResponse(101); + r.headerList = [ + ["upgrade", "websocket"], + ["connection", "Upgrade"], + ["sec-websocket-accept", accept], + ]; + + const protocolsStr = request.headers.get("sec-websocket-protocol") || ""; + const protocols = StringPrototypeSplit(protocolsStr, ", "); + if (protocols && options.protocol) { + if (ArrayPrototypeIncludes(protocols, options.protocol)) { + ArrayPrototypePush(r.headerList, [ + "sec-websocket-protocol", + options.protocol, + ]); + } else { + throw new TypeError( + `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`, + ); + } + } + + const socket = createWebSocketBranded(WebSocket); + setEventTargetData(socket); + socket[_server] = true; + socket[_idleTimeoutDuration] = options.idleTimeout ?? 120; + socket[_idleTimeoutTimeout] = null; + + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeWebSocket", r, socket); + } + + const response = fromInnerResponse(r, "immutable"); + + response[_ws] = socket; + + return { response, socket }; +} + +const spaceCharCode = StringPrototypeCharCodeAt(" ", 0); +const tabCharCode = StringPrototypeCharCodeAt("\t", 0); +const commaCharCode = StringPrototypeCharCodeAt(",", 0); + +/** Builds a case function that can be used to find a case insensitive + * value in some text that's separated by commas. + * + * This is done because it doesn't require any allocations. + * @param checkText {string} - The text to find. (ex. "websocket") + */ +function buildCaseInsensitiveCommaValueFinder(checkText) { + const charCodes = ArrayPrototypeMap( + StringPrototypeSplit( + StringPrototypeToLowerCase(checkText), + "", + ), + (c) => [ + StringPrototypeCharCodeAt(c, 0), + StringPrototypeCharCodeAt(StringPrototypeToUpperCase(c), 0), + ], + ); + /** @type {number} */ + let i; + /** @type {number} */ + let char; + + /** @param {string} value */ + return function (value) { + for (i = 0; i < value.length; i++) { + char = StringPrototypeCharCodeAt(value, i); + skipWhitespace(value); + + if (hasWord(value)) { + skipWhitespace(value); + if (i === value.length || char === commaCharCode) { + return true; + } + } else { + skipUntilComma(value); + } + } + + return false; + }; + + /** @param value {string} */ + function hasWord(value) { + for (let j = 0; j < charCodes.length; ++j) { + const { 0: cLower, 1: cUpper } = charCodes[j]; + if (cLower === char || cUpper === char) { + char = StringPrototypeCharCodeAt(value, ++i); + } else { + return false; + } + } + return true; + } + + /** @param value {string} */ + function skipWhitespace(value) { + while (char === spaceCharCode || char === tabCharCode) { + char = StringPrototypeCharCodeAt(value, ++i); + } + } + + /** @param value {string} */ + function skipUntilComma(value) { + while (char !== commaCharCode && i < value.length) { + char = StringPrototypeCharCodeAt(value, ++i); + } + } +} + +// Expose this function for unit tests +internals.buildCaseInsensitiveCommaValueFinder = + buildCaseInsensitiveCommaValueFinder; + +export { _ws, HttpConn, serve, upgradeWebSocket }; \ No newline at end of file diff --git a/crates/sb_core/js/http.js b/crates/sb_core/js/http.js index a5de988b5..4256380f5 100644 --- a/crates/sb_core/js/http.js +++ b/crates/sb_core/js/http.js @@ -1,6 +1,8 @@ -import { HttpConn } from 'ext:deno_http/01_http.js'; -import { RequestPrototype } from 'ext:deno_fetch/23_request.js'; +import "ext:deno_http/01_http.js"; + import { core, primordials } from "ext:core/mod.js"; +import { RequestPrototype } from "ext:deno_fetch/23_request.js"; +import { HttpConn, upgradeWebSocket } from "ext:sb_core_main_js/js/01_http.js"; const { internalRidSymbol } = core; const { ObjectPrototypeIsPrototypeOf } = primordials; @@ -9,7 +11,7 @@ const HttpConnPrototypeNextRequest = HttpConn.prototype.nextRequest; const HttpConnPrototypeClose = HttpConn.prototype.close; const ops = core.ops; -const watcher = Symbol("watcher"); +const kSupabaseTag = Symbol("kSupabaseTag"); function internalServerError() { // "Internal Server Error" @@ -52,7 +54,10 @@ function serveHttp(conn) { return null; } - nextRequest.request[watcher] = watcherRid; + nextRequest.request[kSupabaseTag] = { + watcherRid, + streamRid: nextRequest.streamRid + }; return nextRequest; }; @@ -138,19 +143,25 @@ async function serve(args1, args2) { }; } -function getWatcherRid(req) { - return req[watcher]; +function getSupabaseTag(req) { + return req[kSupabaseTag]; } -function applyWatcherRid(src, dest) { +function applySupabaseTag(src, dest) { if ( !ObjectPrototypeIsPrototypeOf(RequestPrototype, src) || !ObjectPrototypeIsPrototypeOf(RequestPrototype, dest) ) { - throw new TypeError("Only Request instance can apply the connection watcher"); + throw new TypeError("Only Request instance can apply the supabase tag"); } - dest[watcher] = src[watcher]; + dest[kSupabaseTag] = src[kSupabaseTag]; } -export { serve, serveHttp, getWatcherRid, applyWatcherRid }; +export { + serve, + serveHttp, + getSupabaseTag, + applySupabaseTag, + upgradeWebSocket +}; diff --git a/crates/sb_core/js/main_worker.js b/crates/sb_core/js/main_worker.js index a92da55fd..afcbc1396 100644 --- a/crates/sb_core/js/main_worker.js +++ b/crates/sb_core/js/main_worker.js @@ -1,5 +1,5 @@ import { SUPABASE_USER_WORKERS } from "ext:sb_user_workers/user_workers.js"; -import { applyWatcherRid } from "ext:sb_core_main_js/js/http.js"; +import { applySupabaseTag } from "ext:sb_core_main_js/js/http.js"; import { core } from "ext:core/mod.js"; const ops = core.ops; @@ -9,9 +9,7 @@ Object.defineProperty(globalThis, 'EdgeRuntime', { return { userWorkers: SUPABASE_USER_WORKERS, getRuntimeMetrics: () => /* async */ ops.op_runtime_metrics(), - applyConnectionWatcher: (src, dest) => { - applyWatcherRid(src, dest); - } + applySupabaseTag: (src, dest) => applySupabaseTag(src, dest), }; }, configurable: true, diff --git a/crates/sb_core/lib.rs b/crates/sb_core/lib.rs index 4bed74689..024906958 100644 --- a/crates/sb_core/lib.rs +++ b/crates/sb_core/lib.rs @@ -303,5 +303,6 @@ deno_core::extension!( "js/navigator.js", "js/bootstrap.js", "js/main_worker.js", + "js/01_http.js" ] ); diff --git a/crates/sb_workers/lib.rs b/crates/sb_workers/lib.rs index a75eb28fa..19adbc67c 100644 --- a/crates/sb_workers/lib.rs +++ b/crates/sb_workers/lib.rs @@ -372,6 +372,7 @@ pub async fn op_user_worker_fetch_send( state: Rc>, #[string] key: String, #[smi] rid: ResourceId, + #[smi] stream_rid: ResourceId, #[smi] watcher_rid: Option, ) -> Result { let (tx, request) = { diff --git a/crates/sb_workers/user_workers.js b/crates/sb_workers/user_workers.js index bf4159fe1..18d475d80 100644 --- a/crates/sb_workers/user_workers.js +++ b/crates/sb_workers/user_workers.js @@ -1,9 +1,9 @@ import { primordials, core } from "ext:core/mod.js"; import { readableStreamForRid, writableStreamForRid } from 'ext:deno_web/06_streams.js'; -import { getWatcherRid } from 'ext:sb_core_main_js/js/http.js'; +import { getSupabaseTag } from 'ext:sb_core_main_js/js/http.js'; -const NO_CONN_WATCHER_WARN_MSG = `Unable to find the connection watcher from the request instance.\n\ -Invoke \`EdgeRuntime.applyConnectionWatcher(origReq, newReq)\` if you have cloned the original request.` +const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\ +Invoke \`EdgeRuntime.applySupabaseTag(origReq, newReq)\` if you have cloned the original request.` const ops = core.ops; @@ -28,25 +28,25 @@ class UserWorker { } async fetch(req, opts = {}) { - const watcherRid = getWatcherRid(req); + const tag = getSupabaseTag(req); const { method, url, headers, body, bodyUsed } = req; const { signal } = opts; signal?.throwIfAborted(); - if (watcherRid === void 0) { - console.warn(NO_CONN_WATCHER_WARN_MSG); + if (tag === void 0) { + console.warn(NO_SUPABASE_TAG_WARN_MSG); } const headersArray = Array.from(headers.entries()); - const hasReqBody = !bodyUsed && !!body; + const hasBody = !bodyUsed && !!body; const userWorkerReq = { method, url, + hasBody, headers: headersArray, - hasBody: hasReqBody, }; const { requestRid, requestBodyRid } = await ops.op_user_worker_fetch_build( @@ -55,12 +55,18 @@ class UserWorker { // stream the request body let reqBodyPromise = null; - if (hasReqBody) { + if (hasBody) { let writableStream = writableStreamForRid(requestBodyRid); reqBodyPromise = body.pipeTo(writableStream, { signal }); } - const resPromise = op_user_worker_fetch_send(this.key, requestRid, watcherRid); + const resPromise = op_user_worker_fetch_send( + this.key, + requestRid, + tag.streamRid, + tag.watcherRid + ); + let [sent, res] = await Promise.allSettled([reqBodyPromise, resPromise]); if (sent.status === "rejected") { From 982d730bb829e7676fcd8dbcfccf1c5872600a17 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sun, 18 Feb 2024 01:07:26 +0000 Subject: [PATCH 06/12] feat: websocket upgrade support --- crates/base/build.rs | 4 +- crates/base/src/deno_runtime.rs | 4 +- crates/base/src/rt_worker/worker_ctx.rs | 90 +++++++++++++--- crates/base/src/server.rs | 5 +- crates/base/src/utils.rs | 29 ++++++ crates/sb_core/http.rs | 131 ++++++++++++++++++++++++ crates/sb_core/http_start.rs | 99 +----------------- crates/sb_core/js/denoOverrides.js | 9 +- crates/sb_core/lib.rs | 1 + crates/sb_workers/lib.rs | 75 +++++++++----- 10 files changed, 302 insertions(+), 145 deletions(-) create mode 100644 crates/sb_core/http.rs diff --git a/crates/base/build.rs b/crates/base/build.rs index 10f3a33fa..a805be099 100644 --- a/crates/base/build.rs +++ b/crates/base/build.rs @@ -12,7 +12,8 @@ mod supabase_startup_snapshot { use event_worker::js_interceptors::sb_events_js_interceptors; use event_worker::sb_user_event_worker; use sb_ai::sb_ai; - use sb_core::http_start::sb_core_http; + use sb_core::http::sb_core_http; + use sb_core::http_start::sb_core_http_start; use sb_core::net::sb_core_net; use sb_core::permissions::sb_core_permissions; use sb_core::runtime::sb_core_runtime; @@ -206,6 +207,7 @@ mod supabase_startup_snapshot { sb_core_main_js::init_ops_and_esm(), sb_core_net::init_ops_and_esm(), sb_core_http::init_ops_and_esm(), + sb_core_http_start::init_ops_and_esm(), deno_node::init_ops_and_esm::(None, fs), sb_core_runtime::init_ops_and_esm(None), ]; diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 0e56a19d0..ed41c5646 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -20,6 +20,8 @@ use futures_util::future::poll_fn; use log::{error, trace}; use once_cell::sync::{Lazy, OnceCell}; use sb_core::conn_sync::ConnSync; +use sb_core::http::sb_core_http; +use sb_core::http_start::sb_core_http_start; use sb_core::util::sync::AtomicFlag; use serde::de::DeserializeOwned; use std::collections::HashMap; @@ -37,7 +39,6 @@ use sb_ai::sb_ai; use sb_core::cache::CacheSetting; use sb_core::cert::ValueRootCertStoreProvider; use sb_core::external_memory::custom_allocator; -use sb_core::http_start::sb_core_http; use sb_core::net::sb_core_net; use sb_core::permissions::{sb_core_permissions, Permissions}; use sb_core::runtime::sb_core_runtime; @@ -286,6 +287,7 @@ impl DenoRuntime { sb_core_main_js::init_ops(), sb_core_net::init_ops(), sb_core_http::init_ops(), + sb_core_http_start::init_ops(), deno_node::init_ops::(Some(npm_resolver), file_system), sb_core_runtime::init_ops(Some(main_module_url.clone())), ]; diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index f366a2bf6..a25583f52 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -1,6 +1,6 @@ use crate::deno_runtime::DenoRuntime; -use crate::utils::send_event_if_event_worker_available; use crate::utils::units::bytes_to_display; +use crate::utils::{emit_status_code, get_upgrade_type, send_event_if_event_worker_available}; use crate::rt_worker::worker::{Worker, WorkerHandler}; use crate::rt_worker::worker_pool::WorkerPool; @@ -9,6 +9,10 @@ use cpu_timer::CPUTimer; use event_worker::events::{ BootEvent, ShutdownEvent, WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed, }; +use http::StatusCode; +use http_utils::io::Upgraded2; +use hyper::client::conn::http1; +use hyper::upgrade::OnUpgrade; use hyper::{Body, Request, Response}; use log::{debug, error}; use sb_core::conn_sync::ConnSync; @@ -22,6 +26,7 @@ use sb_workers::errors::WorkerError; use std::future::pending; use std::path::PathBuf; use std::sync::Arc; +use tokio::io::copy_bidirectional; use tokio::net::UnixStream; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, oneshot, watch, Notify}; @@ -83,43 +88,96 @@ async fn handle_request( // create a unix socket pair let (sender_stream, recv_stream) = UnixStream::pair()?; let WorkerRequestMsg { - req, + mut req, res_tx, conn_watch, } = msg; let _ = unix_stream_tx.send((recv_stream, conn_watch.clone())); + let req_upgrade_type = get_upgrade_type(req.headers()); + let req_upgrade = req_upgrade_type + .clone() + .and_then(|it| Some(it).zip(req.extensions_mut().remove::())); // send the HTTP request to the worker over Unix stream - let (mut request_sender, connection) = hyper::client::conn::handshake(sender_stream).await?; + let (mut request_sender, connection) = http1::Builder::new() + .writev(true) + .handshake(sender_stream) + .await?; + + let (upgrade_tx, upgrade_rx) = oneshot::channel(); // spawn a task to poll the connection and drive the HTTP state - tokio::task::spawn(async move { - match connection.without_shutdown().await { - Err(e) => { - error!("Error in worker connection: {}", e.message(),); - } + tokio::task::spawn({ + async move { + match connection.without_shutdown().await { + Err(e) => { + error!("Error in worker connection: {}", e.message()); + } + + Ok(parts) => { + if let Some((requested, req_upgrade)) = req_upgrade { + if let Ok((Some(accepted), status)) = upgrade_rx.await { + if status == StatusCode::SWITCHING_PROTOCOLS && accepted == requested { + tokio::spawn(relay_upgraded_request_and_response( + req_upgrade, + parts, + )); - Ok(parts) => { - if let Some(mut watcher) = conn_watch { - if watcher.wait_for(|it| *it == ConnSync::Recv).await.is_err() { - error!("cannot track outbound connection correctly"); + return; + } + }; } - } - drop(parts); + if let Some(mut watcher) = conn_watch { + if watcher.wait_for(|it| *it == ConnSync::Recv).await.is_err() { + error!("cannot track outbound connection correctly"); + } + } + } } } }); tokio::task::yield_now().await; - let result = request_sender.send_request(req).await; - let _ = res_tx.send(result); + let res = request_sender.send_request(req).await; + let Ok(res) = res else { + drop(res_tx.send(res)); + return Ok(()); + }; + + if let Some(requested) = req_upgrade_type { + let res_upgrade_type = get_upgrade_type(res.headers()); + let _ = upgrade_tx.send((res_upgrade_type.clone(), res.status())); + + match res_upgrade_type { + Some(accepted) if accepted == requested => {} + _ => { + drop(res_tx.send(Ok(emit_status_code(StatusCode::BAD_GATEWAY)))); + return Ok(()); + } + } + } + drop(res_tx.send(Ok(res))); Ok(()) } +async fn relay_upgraded_request_and_response( + downstream: OnUpgrade, + parts: http1::Parts, +) { + let mut upstream = Upgraded2::new(parts.io, parts.read_buf); + let mut downstream = downstream.await.expect("failed to upgrade request"); + + copy_bidirectional(&mut upstream, &mut downstream) + .await + .expect("coping between upgraded connections failed"); + + // XXX(Nyannyacha): Here you might want to emit the event metadata. +} + #[allow(clippy::too_many_arguments)] pub fn create_supervisor( key: Uuid, diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index b2a8a3c98..e85daf191 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -160,7 +160,7 @@ impl Service> for WorkerService { parts, Body::wrap_stream(NotifyOnEos { inner: body, - cancel: Some(cancel.clone()), + cancel: Some(cancel), }), ); @@ -294,7 +294,8 @@ impl Server { let _guard = cancel.drop_guard(); let conn_fut = Http::new() - .serve_connection(conn, service); + .serve_connection(conn, service) + .with_upgrades(); if let Err(e) = conn_fut.await { // Most common cause for these errors are diff --git a/crates/base/src/utils.rs b/crates/base/src/utils.rs index 908cf8813..0c94fd029 100644 --- a/crates/base/src/utils.rs +++ b/crates/base/src/utils.rs @@ -1,4 +1,6 @@ use event_worker::events::{EventMetadata, WorkerEventWithMetadata, WorkerEvents}; +use http::{header, response, HeaderMap, Response, StatusCode}; +use hyper::Body; use tokio::sync::mpsc; pub mod units; @@ -12,3 +14,30 @@ pub fn send_event_if_event_worker_available( let _ = event_worker.send(WorkerEventWithMetadata { event, metadata }); } } + +pub fn get_upgrade_type(headers: &HeaderMap) -> Option { + let connection_header_exists = headers + .get(header::CONNECTION) + .map(|it| { + it.to_str() + .unwrap_or("") + .split(',') + .any(|str| str.trim() == header::UPGRADE) + }) + .unwrap_or(false); + + if connection_header_exists { + if let Some(upgrade) = headers.get(header::UPGRADE) { + return upgrade.to_str().ok().map(str::to_owned); + } + } + + None +} + +pub fn emit_status_code(status: StatusCode) -> Response { + response::Builder::new() + .status(status) + .body(Body::empty()) + .unwrap() +} diff --git a/crates/sb_core/http.rs b/crates/sb_core/http.rs new file mode 100644 index 000000000..dee0a4e33 --- /dev/null +++ b/crates/sb_core/http.rs @@ -0,0 +1,131 @@ +use std::{cell::RefCell, pin::Pin, rc::Rc, task::Poll}; + +use deno_core::{ + error::{custom_error, AnyError}, + op2, Op, OpDecl, OpState, RcRef, ResourceId, +}; +use deno_http::{HttpRequestReader, HttpStreamResource}; +use deno_websocket::ws_create_server_stream; +use futures::pin_mut; +use futures::ready; +use futures::Future; +use hyper::upgrade::Parts; +use log::error; +use tokio::net::UnixStream; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::watch, +}; + +use crate::conn_sync::ConnSync; + +deno_core::extension!( + sb_core_http, + ops = [op_http_upgrade_websocket2], + middleware = sb_http_middleware, +); + +pub(crate) struct UnixStream2(UnixStream, Option>); + +impl UnixStream2 { + pub fn new(stream: UnixStream, watcher: Option>) -> Self { + Self(stream, watcher) + } +} + +impl AsyncRead for UnixStream2 { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0).poll_read(cx, buf) + } +} + +impl AsyncWrite for UnixStream2 { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0).poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + true + } + + #[inline] + fn poll_flush( + self: Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if let Some(ref mut sync) = self.1 { + let fut = sync.wait_for(|it| *it == ConnSync::Recv); + + pin_mut!(fut); + ready!(fut.poll(cx).map(|it| { + if let Err(ex) = it { + error!("cannot track outbound connection correctly"); + return Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, ex)); + } + + Ok(()) + }))? + } + + Pin::new(&mut Pin::into_inner(self).0).poll_shutdown(cx) + } +} + +fn http_error(message: &'static str) -> AnyError { + custom_error("Http", message) +} + +#[op2(async)] +#[smi] +async fn op_http_upgrade_websocket2( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result { + let stream = state + .borrow_mut() + .resource_table + .get::(rid)?; + + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let request = match &mut *rd { + HttpRequestReader::Headers(request) => request, + _ => return Err(http_error("cannot upgrade because request body was used")), + }; + + let upgraded = hyper::upgrade::on(request).await?; + let Parts { io, read_buf, .. } = upgraded.downcast::().unwrap(); + + let ws_rid = ws_create_server_stream(&mut state.borrow_mut(), io.0.into(), read_buf)?; + Ok(ws_rid) +} + +fn sb_http_middleware(decl: OpDecl) -> OpDecl { + match decl.name { + "op_http_upgrade_websocket" => op_http_upgrade_websocket2::DECL, + _ => decl, + } +} diff --git a/crates/sb_core/http_start.rs b/crates/sb_core/http_start.rs index f60db1cd3..69deda7b8 100644 --- a/crates/sb_core/http_start.rs +++ b/crates/sb_core/http_start.rs @@ -1,94 +1,22 @@ -use std::cell::RefCell; use std::collections::HashMap; use std::os::fd::AsRawFd; use std::os::fd::RawFd; -use std::pin::Pin; use std::rc::Rc; -use std::task::Poll; use deno_core::error::bad_resource; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; +use deno_core::op2; use deno_core::OpState; use deno_core::ResourceId; -use deno_core::{op2, ToJsBuffer}; use deno_http::http_create_conn_resource; use deno_net::io::UnixStreamResource; -use futures::pin_mut; -use futures::ready; -use futures::Future; -use log::error; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::UnixStream; + use tokio::sync::watch; use crate::conn_sync::ConnSync; use crate::conn_sync::ConnWatcher; -use serde::Serialize; - -struct UnixStream2(UnixStream, Option>); - -impl AsyncRead for UnixStream2 { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0).poll_read(cx, buf) - } -} - -impl AsyncWrite for UnixStream2 { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0).poll_write(cx, buf) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0).poll_write_vectored(cx, bufs) - } - - fn is_write_vectored(&self) -> bool { - true - } - - #[inline] - fn poll_flush( - self: Pin<&mut Self>, - _: &mut std::task::Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Some(ref mut sync) = self.1 { - let fut = sync.wait_for(|it| *it == ConnSync::Recv); - - pin_mut!(fut); - ready!(fut.poll(cx).map(|it| { - if let Err(ex) = it { - error!("cannot track outbound connection correctly"); - return Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, ex)); - } - - Ok(()) - }))? - } - - Pin::new(&mut Pin::into_inner(self).0).poll_shutdown(cx) - } -} +use crate::http::UnixStream2; #[op2] #[serde] @@ -114,7 +42,7 @@ fn op_http_start( let addr: std::net::SocketAddr = "0.0.0.0:9999".parse().unwrap(); let conn = http_create_conn_resource( state, - UnixStream2(unix_stream, watcher.clone()), + UnixStream2::new(unix_stream, watcher.clone()), addr, "http", )?; @@ -127,21 +55,4 @@ fn op_http_start( Err(bad_resource_id()) } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -pub struct HttpUpgradeResult { - conn_rid: ResourceId, - conn_type: &'static str, - read_buf: ToJsBuffer, -} - -#[op2(async)] -#[serde] -async fn op_http_upgrade( - _state: Rc>, - #[smi] _rid: ResourceId, -) -> Result<(), AnyError> { - Ok(()) -} - -deno_core::extension!(sb_core_http, ops = [op_http_start, op_http_upgrade]); +deno_core::extension!(sb_core_http_start, ops = [op_http_start]); diff --git a/crates/sb_core/js/denoOverrides.js b/crates/sb_core/js/denoOverrides.js index 2876bc5d3..f88d4e36c 100644 --- a/crates/sb_core/js/denoOverrides.js +++ b/crates/sb_core/js/denoOverrides.js @@ -3,7 +3,7 @@ import * as tls from 'ext:deno_net/02_tls.js'; import * as timers from 'ext:deno_web/02_timers.js'; import * as permissions from 'ext:sb_core_main_js/js/permissions.js'; import { errors } from 'ext:sb_core_main_js/js/errors.js'; -import { serve, serveHttp } from 'ext:sb_core_main_js/js/http.js'; +import { serve, serveHttp, upgradeWebSocket } from 'ext:sb_core_main_js/js/http.js'; import * as fs from 'ext:deno_fs/30_fs.js'; import { osCalls } from 'ext:sb_os/os.js'; import * as io from 'ext:deno_io/12_io.js'; @@ -95,20 +95,21 @@ const ioVars = { }; const denoOverrides = { + serve, + serveHttp, + upgradeWebSocket, listen: net.listen, connect: net.connect, connectTls: tls.connectTls, startTls: tls.startTls, resolveDns: net.resolveDns, - serveHttp: serveHttp, - serve: serve, permissions: permissions.permissions, Permissions: permissions.Permissions, PermissionStatus: permissions.PermissionStatus, errors: errors, refTimer: timers.refTimer, unrefTimer: timers.unrefTimer, - isatty: (arg) => false, + isatty: (_arg) => false, ...ioVars, ...fsVars, ...osCallsVars, diff --git a/crates/sb_core/lib.rs b/crates/sb_core/lib.rs index 024906958..adfa7c0af 100644 --- a/crates/sb_core/lib.rs +++ b/crates/sb_core/lib.rs @@ -22,6 +22,7 @@ pub mod emit; pub mod errors_rt; pub mod external_memory; pub mod file_fetcher; +pub mod http; pub mod http_start; pub mod net; pub mod permissions; diff --git a/crates/sb_workers/lib.rs b/crates/sb_workers/lib.rs index 19adbc67c..26f569987 100644 --- a/crates/sb_workers/lib.rs +++ b/crates/sb_workers/lib.rs @@ -15,9 +15,12 @@ use deno_core::{ AsyncRefCell, AsyncResult, BufView, ByteString, CancelFuture, CancelHandle, CancelTryFuture, JsBuffer, OpState, RcRef, Resource, ResourceId, WriteOutcome, }; +use deno_http::{HttpRequestReader, HttpStreamResource}; use errors::WorkerError; +use http_utils::utils::get_upgrade_type; use hyper::body::HttpBody; use hyper::header::{HeaderName, HeaderValue, CONTENT_LENGTH}; +use hyper::upgrade::OnUpgrade; use hyper::{Body, Method, Request}; use log::error; use sb_core::conn_sync::{ConnSync, ConnWatcher}; @@ -324,7 +327,7 @@ pub fn op_user_worker_fetch_build( ) -> Result { let method = Method::from_bytes(&req.method)?; - let mut request = Request::builder().uri(req.url).method(&method); + let mut builder = Request::builder().uri(req.url).method(&method); let mut body = Body::empty(); let mut request_body_rid = None; @@ -353,12 +356,12 @@ pub fn op_user_worker_fetch_build( header_value = HeaderValue::from(0); } - request = request.header(header_name, header_value); + builder = builder.header(header_name, header_value); } } - let request = request.body(body)?; - let request_rid = state.resource_table.add(UserWorkerRequestResource(request)); + let req = builder.body(body)?; + let request_rid = state.resource_table.add(UserWorkerRequestResource(req)); Ok(UserWorkerBuiltRequest { request_rid, @@ -375,22 +378,41 @@ pub async fn op_user_worker_fetch_send( #[smi] stream_rid: ResourceId, #[smi] watcher_rid: Option, ) -> Result { - let (tx, request) = { - let mut op_state = state.borrow_mut(); - let tx = op_state - .borrow::>() - .clone(); + let (tx, req) = { + let (tx, mut req) = { + let mut op_state = state.borrow_mut(); + let tx = op_state + .borrow::>() + .clone(); + + let req = Rc::try_unwrap( + op_state + .resource_table + .take::(rid)?, + ) + .ok() + .expect("multiple op_user_worker_fetch_send ongoing"); + + (tx, req) + }; - let request = op_state - .resource_table - .take::(rid)?; + if get_upgrade_type(req.0.headers()).is_some() { + let req_stream = state + .borrow_mut() + .resource_table + .get::(stream_rid)?; - (tx, request) - }; + let mut req_reader_mut = RcRef::map(&req_stream, |r| &r.rd).borrow_mut().await; - let request = Rc::try_unwrap(request) - .ok() - .expect("multiple op_user_worker_fetch_send ongoing"); + if let HttpRequestReader::Headers(orig_req) = &mut *req_reader_mut { + if let Some(upgrade) = orig_req.extensions_mut().remove::() { + let _ = req.0.extensions_mut().insert(upgrade); + } + } + } + + (tx, req) + }; let (result_tx, result_rx) = oneshot::channel::>(); let key_parsed = Uuid::try_parse(key.as_str())?; @@ -417,14 +439,14 @@ pub async fn op_user_worker_fetch_send( tx.send(UserWorkerMsgs::SendRequest( key_parsed, - request.0, + req.0, result_tx, watcher.clone(), ))?; - let result = result_rx.await?; - let (result, req_end_tx) = match result { - Ok((result, req_end_tx)) => (result, req_end_tx), + let res = result_rx.await?; + let (res, req_end_tx) = match res { + Ok((res, req_end_tx)) => (res, req_end_tx), Err(err) => { error!("user worker failed to respond: {}", err); match err.downcast_ref() { @@ -443,24 +465,23 @@ pub async fn op_user_worker_fetch_send( }; let mut headers = vec![]; - for (key, value) in result.headers().iter() { + for (key, value) in res.headers().iter() { headers.push(( ByteString::from(key.as_str()), ByteString::from(value.to_str().unwrap_or_default()), )); } - let status = result.status().as_u16(); - let status_text = result + let status = res.status().as_u16(); + let status_text = res .status() .canonical_reason() .unwrap_or("") .to_string(); - let size = HttpBody::size_hint(result.body()).exact(); + let size = HttpBody::size_hint(res.body()).exact(); let stream: BytesStream = Box::pin( - result - .into_body() + res.into_body() .map(|r| r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))), ); From c297071d52b240f768f2fe7cca307085b78a0e5e Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 19 Feb 2024 01:59:03 +0000 Subject: [PATCH 07/12] stamp: rid duplicate things --- crates/base/src/rt_worker/worker_ctx.rs | 3 ++- crates/base/src/utils.rs | 29 ------------------------- 2 files changed, 2 insertions(+), 30 deletions(-) diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index a25583f52..b86ac7960 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -1,6 +1,6 @@ use crate::deno_runtime::DenoRuntime; +use crate::utils::send_event_if_event_worker_available; use crate::utils::units::bytes_to_display; -use crate::utils::{emit_status_code, get_upgrade_type, send_event_if_event_worker_available}; use crate::rt_worker::worker::{Worker, WorkerHandler}; use crate::rt_worker::worker_pool::WorkerPool; @@ -11,6 +11,7 @@ use event_worker::events::{ }; use http::StatusCode; use http_utils::io::Upgraded2; +use http_utils::utils::{emit_status_code, get_upgrade_type}; use hyper::client::conn::http1; use hyper::upgrade::OnUpgrade; use hyper::{Body, Request, Response}; diff --git a/crates/base/src/utils.rs b/crates/base/src/utils.rs index 0c94fd029..908cf8813 100644 --- a/crates/base/src/utils.rs +++ b/crates/base/src/utils.rs @@ -1,6 +1,4 @@ use event_worker::events::{EventMetadata, WorkerEventWithMetadata, WorkerEvents}; -use http::{header, response, HeaderMap, Response, StatusCode}; -use hyper::Body; use tokio::sync::mpsc; pub mod units; @@ -14,30 +12,3 @@ pub fn send_event_if_event_worker_available( let _ = event_worker.send(WorkerEventWithMetadata { event, metadata }); } } - -pub fn get_upgrade_type(headers: &HeaderMap) -> Option { - let connection_header_exists = headers - .get(header::CONNECTION) - .map(|it| { - it.to_str() - .unwrap_or("") - .split(',') - .any(|str| str.trim() == header::UPGRADE) - }) - .unwrap_or(false); - - if connection_header_exists { - if let Some(upgrade) = headers.get(header::UPGRADE) { - return upgrade.to_str().ok().map(str::to_owned); - } - } - - None -} - -pub fn emit_status_code(status: StatusCode) -> Response { - response::Builder::new() - .status(status) - .body(Body::empty()) - .unwrap() -} From b0316eb1d6c2019da46549d8e74af4c4b5546664 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 19 Feb 2024 02:32:54 +0000 Subject: [PATCH 08/12] stamp(base): add dev dependencies --- Cargo.lock | 34 ++++++++++++++++++++++++++++++++++ crates/base/Cargo.toml | 3 +++ 2 files changed, 37 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2a9678498..87aa8226e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -302,6 +302,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-tungstenite" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef0f8d64ef9351752fbe5462f242c625d9c4910d2bc3f7ec44c43857ca123f5d" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "tungstenite", +] + [[package]] name = "atty" version = "0.2.14" @@ -406,6 +419,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tungstenite", "url", "urlencoding", "uuid", @@ -5994,6 +6008,7 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "futures-util", "hashbrown 0.14.3", @@ -6092,6 +6107,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.0.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index d172140a4..9dbeacf03 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -66,7 +66,10 @@ deno_webgpu.workspace = true sb_ai = { version = "0.1.0", path = "../sb_ai" } [dev-dependencies] +tokio-util = { workspace = true, features = ["rt", "compat"] } serial_test = { version = "3.0.0" } +async-tungstenite = { version = "0.25.0", default-features = false } +tungstenite = { version = "0.21.0", default-features = false, features = ["handshake"] } [build-dependencies] sb_core = { version = "0.1.0", path = "../sb_core" } From ecff7342436d4aa9ac1176dc11197619cbf94262 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 19 Feb 2024 02:33:53 +0000 Subject: [PATCH 09/12] stamp: add websocket upgrade test --- .../test_cases/websocket-upgrade/index.ts | 13 ++++ crates/base/tests/integration_tests.rs | 71 ++++++++++++++++++- 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 crates/base/test_cases/websocket-upgrade/index.ts diff --git a/crates/base/test_cases/websocket-upgrade/index.ts b/crates/base/test_cases/websocket-upgrade/index.ts new file mode 100644 index 000000000..f94cbc6b9 --- /dev/null +++ b/crates/base/test_cases/websocket-upgrade/index.ts @@ -0,0 +1,13 @@ +Deno.serve(async (req: Request) => { + const { socket, response } = Deno.upgradeWebSocket(req); + + socket.onopen = () => { + socket.send("meow"); + }; + + socket.onmessage = ev => { + socket.send(ev.data); + }; + + return response; +}); diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 16c67676b..08bde22e2 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -4,16 +4,21 @@ mod integration_test_helper; use std::{collections::HashMap, path::Path, time::Duration}; use anyhow::Context; +use async_tungstenite::WebSocketStream; use base::{ integration_test, rt_worker::worker_ctx::{create_user_worker_pool, create_worker, TerminationToken}, server::ServerEvent, }; +use futures_util::{SinkExt, StreamExt}; use http::{Method, Request, StatusCode}; +use http_utils::utils::get_upgrade_type; use hyper::{body::to_bytes, Body}; use sb_workers::context::{MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRuntimeOpts}; use serial_test::serial; use tokio::{join, sync::mpsc}; +use tokio_util::compat::TokioAsyncReadCompatExt; +use tungstenite::Message; use urlencoding::encode; use crate::integration_test_helper::{ @@ -680,8 +685,10 @@ async fn test_user_worker_json_imports() { let body_bytes = res.bytes().await.unwrap(); assert_eq!(body_bytes, r#"{"version":"1.0.0"}"#); + + term.cancel_and_wait().await; }), - term + term.clone() ); } @@ -919,3 +926,65 @@ async fn req_failure_case_intentional_peer_reset() { term.cancel_and_wait().await; } + +#[tokio::test] +#[serial] +async fn test_websocket_upgrade() { + let term = TerminationToken::new(); + let port = 8498; + let client = reqwest::Client::new(); + let nonce = tungstenite::handshake::client::generate_key(); + let req = client + .request( + Method::GET, + format!("http://localhost:{}/websocket-upgrade", port), + ) + .header(reqwest::header::CONNECTION, "upgrade") + .header(reqwest::header::UPGRADE, "websocket") + .header(reqwest::header::SEC_WEBSOCKET_KEY, &nonce) + .header(reqwest::header::SEC_WEBSOCKET_VERSION, "13") + .build() + .unwrap(); + + let original = reqwest::RequestBuilder::from_parts(client, req); + let request_builder = Some(original); + + integration_test!( + "./test_cases/main", + port, + "", + None, + None, + request_builder, + (|resp: Result| async { + let res = resp.unwrap(); + let accepted = get_upgrade_type(res.headers()); + + assert!(res.status().as_u16() == 101); + assert!(accepted.is_some()); + assert_eq!(accepted.as_ref().unwrap(), "websocket"); + + let upgraded = res.upgrade().await.unwrap(); + let mut ws = WebSocketStream::from_raw_socket( + upgraded.compat(), + tungstenite::protocol::Role::Client, + None, + ) + .await; + + assert_eq!( + ws.next().await.unwrap().unwrap().into_text().unwrap(), + "meow" + ); + + ws.send(Message::Text("meow!!".into())).await.unwrap(); + assert_eq!( + ws.next().await.unwrap().unwrap().into_text().unwrap(), + "meow!!" + ); + + term.cancel_and_wait().await; + }), + term.clone() + ); +} From 5d8be9051c32b3b862283b9b833865707eec7115 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 20 Feb 2024 02:39:14 +0000 Subject: [PATCH 10/12] stamp: add websocket upgrade examples --- examples/oak-ws/index.ts | 33 +++++++++++++++++++++++++++++++++ examples/ws/index.ts | 20 ++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 examples/oak-ws/index.ts create mode 100644 examples/ws/index.ts diff --git a/examples/oak-ws/index.ts b/examples/oak-ws/index.ts new file mode 100644 index 000000000..aa3abc577 --- /dev/null +++ b/examples/oak-ws/index.ts @@ -0,0 +1,33 @@ +import { Application, Router } from "https://deno.land/x/oak@v12.3.0/mod.ts"; + +const router = new Router(); + +router + // Note: path will be prefixed with function name + .get("/oak-ws", ctx => { + if (!ctx.isUpgradable) { + ctx.throw(501); + } + + const ws = ctx.upgrade(); + + ws.onopen = () => { + console.log("Connected to client"); + ws.send("Hello from server!"); + }; + + ws.onmessage = m => { + console.log("Got message from client: ", m.data); + ws.send(m.data as string); + ws.close(); + }; + + ws.onclose = () => console.log("Disconncted from client"); + }) + +const app = new Application(); + +app.use(router.routes()); +app.use(router.allowedMethods()); + +await app.listen(); diff --git a/examples/ws/index.ts b/examples/ws/index.ts new file mode 100644 index 000000000..fe495accb --- /dev/null +++ b/examples/ws/index.ts @@ -0,0 +1,20 @@ +Deno.serve(req => { + const upgrade = req.headers.get("upgrade") || ""; + + if (upgrade.toLowerCase() != "websocket") { + return new Response("request isn't trying to upgrade to websocket."); + } + + const { socket, response } = Deno.upgradeWebSocket(req); + + socket.onopen = () => console.log("socket opened"); + socket.onmessage = (e) => { + console.log("socket message:", e.data); + socket.send(new Date().toString()); + }; + + socket.onerror = e => console.log("socket errored:", e.message); + socket.onclose = () => console.log("socket closed"); + + return response; +}); \ No newline at end of file From 6349ec956a58e1f36a2b8a70745c8b5b9c0ea472 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sat, 24 Feb 2024 04:37:33 +0000 Subject: [PATCH 11/12] stamp: add websocket upgrade example for main worker --- examples/main/index.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/examples/main/index.ts b/examples/main/index.ts index c0305d915..758719db1 100644 --- a/examples/main/index.ts +++ b/examples/main/index.ts @@ -19,6 +19,28 @@ serve(async (req: Request) => { return Response.json(metric); } + // NOTE: You can test WebSocket in the main worker by uncommenting below. + // if (pathname === '/_internal/ws') { + // const upgrade = req.headers.get("upgrade") || ""; + + // if (upgrade.toLowerCase() != "websocket") { + // return new Response("request isn't trying to upgrade to websocket."); + // } + + // const { socket, response } = Deno.upgradeWebSocket(req); + + // socket.onopen = () => console.log("socket opened"); + // socket.onmessage = (e) => { + // console.log("socket message:", e.data); + // socket.send(new Date().toString()); + // }; + + // socket.onerror = e => console.log("socket errored:", e.message); + // socket.onclose = () => console.log("socket closed"); + + // return response; // 101 (Switching Protocols) + // } + const path_parts = pathname.split('/'); const service_name = path_parts[1]; From 91fefc0a8b7d483a09cdd4b991ad071af2870b24 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Sat, 24 Feb 2024 05:32:51 +0000 Subject: [PATCH 12/12] stamp(http_utils): add license header --- crates/http_utils/src/io/rewind.rs | 3 +++ crates/http_utils/src/io/upgraded2.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/crates/http_utils/src/io/rewind.rs b/crates/http_utils/src/io/rewind.rs index 7940594a1..dbf7ec053 100644 --- a/crates/http_utils/src/io/rewind.rs +++ b/crates/http_utils/src/io/rewind.rs @@ -1,3 +1,6 @@ +// Copyright 2014-2021 Sean McArthur +// SPDX-License-Identifier: MIT + use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/crates/http_utils/src/io/upgraded2.rs b/crates/http_utils/src/io/upgraded2.rs index 155ec01e6..d338abd55 100644 --- a/crates/http_utils/src/io/upgraded2.rs +++ b/crates/http_utils/src/io/upgraded2.rs @@ -1,3 +1,6 @@ +// Copyright 2014-2021 Sean McArthur +// SPDX-License-Identifier: MIT + use std::{ fmt, io, pin::Pin,