diff --git a/.changeset/lucky-dots-obey.md b/.changeset/lucky-dots-obey.md new file mode 100644 index 000000000..09160586f --- /dev/null +++ b/.changeset/lucky-dots-obey.md @@ -0,0 +1,5 @@ +--- +"open-next": minor +--- + +Replace InternalResult body from string to ReadableStream diff --git a/packages/open-next/src/adapters/edge-adapter.ts b/packages/open-next/src/adapters/edge-adapter.ts index 09470a8ac..3c4fecbe9 100644 --- a/packages/open-next/src/adapters/edge-adapter.ts +++ b/packages/open-next/src/adapters/edge-adapter.ts @@ -1,4 +1,7 @@ -import { InternalEvent, InternalResult } from "types/open-next"; +import type { ReadableStream } from "node:stream/web"; + +import type { InternalEvent, InternalResult } from "types/open-next"; +import { emptyReadableStream } from "utils/stream"; // We import it like that so that the edge plugin can replace it import { NextConfig } from "../adapters/config"; @@ -8,11 +11,14 @@ import { convertToQueryString, } from "../core/routing/util"; +declare global { + var isEdgeRuntime: true; +} + const defaultHandler = async ( internalEvent: InternalEvent, ): Promise => { - // TODO: We need to handle splitted function here - // We should probably create an host resolver to redirect correctly + globalThis.isEdgeRuntime = true; const host = internalEvent.headers.host ? `https://${internalEvent.headers.host}` @@ -35,10 +41,6 @@ const defaultHandler = async ( url, body: convertBodyToReadableStream(internalEvent.method, internalEvent.body), }); - - const arrayBuffer = await response.arrayBuffer(); - const buffer = Buffer.from(arrayBuffer); - const responseHeaders: Record = {}; response.headers.forEach((value, key) => { if (key.toLowerCase() === "set-cookie") { @@ -49,9 +51,9 @@ const defaultHandler = async ( responseHeaders[key] = value; } }); - // console.log("responseHeaders", responseHeaders); - const body = buffer.toString(); - // console.log("body", body); + + const body = + (response.body as ReadableStream) ?? emptyReadableStream(); return { type: "core", diff --git a/packages/open-next/src/adapters/image-optimization-adapter.ts b/packages/open-next/src/adapters/image-optimization-adapter.ts index a589c3391..d57aa5cde 100644 --- a/packages/open-next/src/adapters/image-optimization-adapter.ts +++ b/packages/open-next/src/adapters/image-optimization-adapter.ts @@ -19,6 +19,7 @@ import { // @ts-ignore import type { NextUrlWithParsedQuery } from "next/dist/server/request-meta"; import { InternalEvent, InternalResult } from "types/open-next.js"; +import { emptyReadableStream, toReadableStream } from "utils/stream.js"; import { createGenericHandler } from "../core/createGenericHandler.js"; import { resolveImageLoader } from "../core/resolve.js"; @@ -82,7 +83,7 @@ export async function defaultHandler( return { statusCode: 304, headers: {}, - body: "", + body: emptyReadableStream(), isBase64Encoded: false, type: "core", }; @@ -169,7 +170,7 @@ function buildSuccessResponse( return { type: "core", statusCode: 200, - body: result.buffer.toString("base64"), + body: toReadableStream(result.buffer, true), isBase64Encoded: true, headers, }; @@ -191,7 +192,7 @@ function buildFailureResponse( "Cache-Control": `public,max-age=60,immutable`, "Content-Type": "application/json", }); - response.end(e?.message || e?.toString() || e); + response.end(e?.message || e?.toString() || "An error occurred"); } return { type: "core", @@ -203,7 +204,7 @@ function buildFailureResponse( "Cache-Control": `public,max-age=60,immutable`, "Content-Type": "application/json", }, - body: e?.message || e?.toString() || e, + body: toReadableStream(e?.message || e?.toString() || "An error occurred"), }; } diff --git a/packages/open-next/src/converters/aws-apigw-v1.ts b/packages/open-next/src/converters/aws-apigw-v1.ts index dabd88bcc..c5e1fcfa6 100644 --- a/packages/open-next/src/converters/aws-apigw-v1.ts +++ b/packages/open-next/src/converters/aws-apigw-v1.ts @@ -1,5 +1,6 @@ import { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda"; import type { Converter, InternalEvent, InternalResult } from "types/open-next"; +import { fromReadableStream } from "utils/stream"; import { debug } from "../adapters/logger"; import { removeUndefinedFromQuery } from "./utils"; @@ -74,9 +75,9 @@ async function convertFromAPIGatewayProxyEvent( }; } -function convertToApiGatewayProxyResult( +async function convertToApiGatewayProxyResult( result: InternalResult, -): APIGatewayProxyResult { +): Promise { const headers: Record = {}; const multiValueHeaders: Record = {}; Object.entries(result.headers).forEach(([key, value]) => { @@ -91,10 +92,12 @@ function convertToApiGatewayProxyResult( } }); + const body = await fromReadableStream(result.body, result.isBase64Encoded); + const response: APIGatewayProxyResult = { statusCode: result.statusCode, headers, - body: result.body, + body, isBase64Encoded: result.isBase64Encoded, multiValueHeaders, }; diff --git a/packages/open-next/src/converters/aws-apigw-v2.ts b/packages/open-next/src/converters/aws-apigw-v2.ts index 58c8e7e1a..8a30d2ad5 100644 --- a/packages/open-next/src/converters/aws-apigw-v2.ts +++ b/packages/open-next/src/converters/aws-apigw-v2.ts @@ -1,6 +1,7 @@ import { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda"; import { parseCookies } from "http/util"; import type { Converter, InternalEvent, InternalResult } from "types/open-next"; +import { fromReadableStream } from "utils/stream"; import { debug } from "../adapters/logger"; import { convertToQuery } from "../core/routing/util"; @@ -85,9 +86,9 @@ async function convertFromAPIGatewayProxyEventV2( }; } -function convertToApiGatewayProxyResultV2( +async function convertToApiGatewayProxyResultV2( result: InternalResult, -): APIGatewayProxyResultV2 { +): Promise { const headers: Record = {}; Object.entries(result.headers) .filter( @@ -104,11 +105,13 @@ function convertToApiGatewayProxyResultV2( headers[key] = Array.isArray(value) ? value.join(", ") : `${value}`; }); + const body = await fromReadableStream(result.body, result.isBase64Encoded); + const response: APIGatewayProxyResultV2 = { statusCode: result.statusCode, headers, cookies: parseCookies(result.headers["set-cookie"]), - body: result.body, + body, isBase64Encoded: result.isBase64Encoded, }; debug(response); diff --git a/packages/open-next/src/converters/aws-cloudfront.ts b/packages/open-next/src/converters/aws-cloudfront.ts index d3f4dc792..440cbd23a 100644 --- a/packages/open-next/src/converters/aws-cloudfront.ts +++ b/packages/open-next/src/converters/aws-cloudfront.ts @@ -8,6 +8,7 @@ import { import { OutgoingHttpHeader } from "http"; import { parseCookies } from "http/util"; import type { Converter, InternalEvent, InternalResult } from "types/open-next"; +import { fromReadableStream } from "utils/stream"; import { debug } from "../adapters/logger"; import { @@ -159,6 +160,10 @@ async function convertToCloudFrontRequestResult( const serverResponse = createServerResponse(result.internalEvent, {}); await proxyRequest(result.internalEvent, serverResponse); const externalResult = convertRes(serverResponse); + const body = await fromReadableStream( + externalResult.body, + externalResult.isBase64Encoded, + ); const cloudfrontResult = { status: externalResult.statusCode.toString(), statusDescription: "OK", @@ -166,7 +171,7 @@ async function convertToCloudFrontRequestResult( bodyEncoding: externalResult.isBase64Encoded ? ("base64" as const) : ("text" as const), - body: externalResult.body, + body, }; debug("externalResult", cloudfrontResult); return cloudfrontResult; @@ -208,12 +213,14 @@ async function convertToCloudFrontRequestResult( return response; } + const body = await fromReadableStream(result.body, result.isBase64Encoded); + const response: CloudFrontRequestResult = { status: result.statusCode.toString(), statusDescription: "OK", headers: convertToCloudfrontHeaders(responseHeaders, true), bodyEncoding: result.isBase64Encoded ? "base64" : "text", - body: result.body, + body, }; debug(response); return response; diff --git a/packages/open-next/src/converters/edge.ts b/packages/open-next/src/converters/edge.ts index aca0d42f2..7341c7486 100644 --- a/packages/open-next/src/converters/edge.ts +++ b/packages/open-next/src/converters/edge.ts @@ -90,7 +90,7 @@ const converter: Converter< for (const [key, value] of Object.entries(result.headers)) { headers.set(key, Array.isArray(value) ? value.join(",") : value); } - return new Response(result.body, { + return new Response(result.body as ReadableStream, { status: result.statusCode, headers: headers, }); diff --git a/packages/open-next/src/converters/node.ts b/packages/open-next/src/converters/node.ts index 662c0cb11..ec3e28641 100644 --- a/packages/open-next/src/converters/node.ts +++ b/packages/open-next/src/converters/node.ts @@ -44,7 +44,7 @@ const converter: Converter = { }; }, // Nothing to do here, it's streaming - convertTo: (internalResult: InternalResult) => ({ + convertTo: async (internalResult: InternalResult) => ({ body: internalResult.body, headers: internalResult.headers, statusCode: internalResult.statusCode, diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index 9442414d1..0e83cc0e1 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -7,6 +7,7 @@ import { } from "http/index.js"; import { InternalEvent, InternalResult } from "types/open-next"; import { DetachedPromiseRunner } from "utils/promise"; +import { fromReadableStream } from "utils/stream"; import { debug, error, warn } from "../adapters/logger"; import { convertRes, createServerResponse, proxyRequest } from "./routing/util"; @@ -63,12 +64,22 @@ export async function openNextHandler( }, {}); if ("type" in preprocessResult) { - // res is used only in the streaming case - const res = createServerResponse(internalEvent, headers, responseStreaming); - res.statusCode = preprocessResult.statusCode; - res.flushHeaders(); - res.write(preprocessResult.body); - res.end(); + // // res is used only in the streaming case + if (responseStreaming) { + const res = createServerResponse( + internalEvent, + headers, + responseStreaming, + ); + res.statusCode = preprocessResult.statusCode; + res.flushHeaders(); + const body = await fromReadableStream( + preprocessResult.body, + preprocessResult.isBase64Encoded, + ); + res.write(body); + res.end(); + } return preprocessResult; } else { const preprocessedEvent = preprocessResult.internalEvent; diff --git a/packages/open-next/src/core/routing/matcher.ts b/packages/open-next/src/core/routing/matcher.ts index 910f0368d..245629aeb 100644 --- a/packages/open-next/src/core/routing/matcher.ts +++ b/packages/open-next/src/core/routing/matcher.ts @@ -14,6 +14,7 @@ import type { RouteHas, } from "types/next-types"; import { InternalEvent, InternalResult } from "types/open-next"; +import { emptyReadableStream, toReadableStream } from "utils/stream"; import { debug } from "../../adapters/logger"; import { localizePath } from "./i18n"; @@ -243,8 +244,11 @@ export function handleRewrites( }; } -function handleTrailingSlashRedirect(event: InternalEvent) { +function handleTrailingSlashRedirect( + event: InternalEvent, +): false | InternalResult { const url = new URL(event.url, "http://localhost"); + const emptyBody = emptyReadableStream(); if ( // Someone is trying to redirect to a different origin, let's not do that @@ -270,7 +274,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) { headersLocation[1] ? `?${headersLocation[1]}` : "" }`, }, - body: "", + body: emptyBody, isBase64Encoded: false, }; // eslint-disable-next-line sonarjs/elseif-without-else @@ -288,7 +292,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) { headersLocation[1] ? `?${headersLocation[1]}` : "" }`, }, - body: "", + body: emptyBody, isBase64Encoded: false, }; } else return false; @@ -311,7 +315,7 @@ export function handleRedirects( headers: { Location: internalEvent.url, }, - body: "", + body: emptyReadableStream(), isBase64Encoded: false, }; } @@ -328,7 +332,7 @@ export function fixDataPage( return { type: internalEvent.type, statusCode: 404, - body: "{}", + body: toReadableStream("{}"), headers: { "Content-Type": "application/json", }, diff --git a/packages/open-next/src/core/routing/middleware.ts b/packages/open-next/src/core/routing/middleware.ts index c1a7e89c9..fcf83b501 100644 --- a/packages/open-next/src/core/routing/middleware.ts +++ b/packages/open-next/src/core/routing/middleware.ts @@ -1,5 +1,8 @@ +import { ReadableStream } from "node:stream/web"; + import { MiddlewareManifest, NextConfig } from "config/index.js"; import { InternalEvent, InternalResult } from "types/open-next.js"; +import { emptyReadableStream } from "utils/stream.js"; //NOTE: we should try to avoid importing stuff from next as much as possible // every release of next could break this @@ -137,7 +140,7 @@ export async function handleMiddleware( ) ?? resHeaders.location; // res.setHeader("Location", location); return { - body: "", + body: emptyReadableStream(), type: internalEvent.type, statusCode: statusCode, headers: resHeaders, @@ -182,16 +185,14 @@ export async function handleMiddleware( // the body immediately to the client. if (result.body) { // transfer response body to res - const arrayBuffer = await result.arrayBuffer(); - const buffer = Buffer.from(arrayBuffer); - // res.end(buffer); + const body = result.body as ReadableStream; // await pipeReadable(result.response.body, res); return { type: internalEvent.type, statusCode: statusCode, headers: resHeaders, - body: buffer.toString(), + body, isBase64Encoded: false, }; } diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index fb9a5e399..3c2ac43ab 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -1,12 +1,13 @@ import crypto from "node:crypto"; import { OutgoingHttpHeaders } from "node:http"; +import { Readable } from "node:stream"; import { BuildId, HtmlPages } from "config/index.js"; import type { IncomingMessage, StreamCreator } from "http/index.js"; import { OpenNextNodeResponse } from "http/openNextResponse.js"; import { parseHeaders } from "http/util.js"; import type { MiddlewareManifest } from "types/next-types"; -import { InternalEvent } from "types/open-next.js"; +import { InternalEvent, InternalResult } from "types/open-next.js"; import { isBinaryContentType } from "../../adapters/binary.js"; import { debug, error } from "../../adapters/logger.js"; @@ -69,7 +70,7 @@ export function getUrlParts(url: string, isExternal: boolean) { * * @__PURE__ */ -export function convertRes(res: OpenNextNodeResponse) { +export function convertRes(res: OpenNextNodeResponse): InternalResult { // Format Next.js response to Lambda response const statusCode = res.statusCode || 200; // When using HEAD requests, it seems that flushHeaders is not called, not sure why @@ -80,9 +81,9 @@ export function convertRes(res: OpenNextNodeResponse) { ? headers["content-type"][0] : headers["content-type"], ); - const encoding = isBase64Encoded ? "base64" : "utf8"; - const body = res.getBody().toString(encoding); + const body = Readable.toWeb(res); return { + type: "core", statusCode, headers, body, diff --git a/packages/open-next/src/types/open-next.ts b/packages/open-next/src/types/open-next.ts index a58baefed..eb1187256 100644 --- a/packages/open-next/src/types/open-next.ts +++ b/packages/open-next/src/types/open-next.ts @@ -1,4 +1,5 @@ import type { Readable } from "node:stream"; +import type { ReadableStream } from "node:stream/web"; import type { StreamCreator } from "http/index.js"; @@ -25,7 +26,7 @@ export type InternalEvent = { export type InternalResult = { statusCode: number; headers: Record; - body: string; + body: ReadableStream; isBase64Encoded: boolean; } & BaseEventOrResult<"core">; @@ -58,7 +59,7 @@ export type Converter< R extends BaseEventOrResult = InternalResult, > = BaseOverride & { convertFrom: (event: any) => Promise; - convertTo: (result: R, originalRequest?: any) => any; + convertTo: (result: R, originalRequest?: any) => Promise; }; export type WrapperHandler< diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts new file mode 100644 index 000000000..77363edc4 --- /dev/null +++ b/packages/open-next/src/utils/stream.ts @@ -0,0 +1,43 @@ +import { Readable } from "node:stream"; +import type { ReadableStream } from "node:stream/web"; + +export function fromReadableStream( + stream: ReadableStream, + base64?: boolean, +): Promise { + const reader = stream.getReader(); + const chunks: Uint8Array[] = []; + + return new Promise((resolve, reject) => { + function pump() { + reader + .read() + .then(({ done, value }) => { + if (done) { + resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8")); + return; + } + chunks.push(value); + pump(); + }) + .catch(reject); + } + pump(); + }); +} + +export function toReadableStream( + value: string, + isBase64?: boolean, +): ReadableStream { + return Readable.toWeb( + Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")), + ); +} + +export function emptyReadableStream(): ReadableStream { + if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") { + return Readable.toWeb(Readable.from(["SOMETHING"])); + } + return Readable.toWeb(Readable.from([])); +} diff --git a/packages/open-next/src/wrappers/aws-lambda-streaming.ts b/packages/open-next/src/wrappers/aws-lambda-streaming.ts index ba046055d..891246a38 100644 --- a/packages/open-next/src/wrappers/aws-lambda-streaming.ts +++ b/packages/open-next/src/wrappers/aws-lambda-streaming.ts @@ -1,11 +1,11 @@ -import { Writable } from "node:stream"; +import { Readable, Writable } from "node:stream"; import zlib from "node:zlib"; import { APIGatewayProxyEventV2 } from "aws-lambda"; import { StreamCreator } from "http/index.js"; import { WrapperHandler } from "types/open-next"; -import { error } from "../adapters/logger"; +import { debug, error } from "../adapters/logger"; import { WarmerEvent, WarmerResponse } from "../adapters/warmer-function"; type AwsLambdaEvent = APIGatewayProxyEventV2 | WarmerEvent; @@ -35,6 +35,8 @@ const handler: WrapperHandler = async (handler, converter) => return; } + let headersWritten = false; + const internalEvent = await converter.convertFrom(event); //Handle compression @@ -89,6 +91,7 @@ const handler: WrapperHandler = async (handler, converter) => responseStream.write(prelude); responseStream.write(new Uint8Array(8)); + headersWritten = true; return compressedStream ?? responseStream; }, @@ -100,7 +103,18 @@ const handler: WrapperHandler = async (handler, converter) => const response = await handler(internalEvent, streamCreator); - return converter.convertTo(response); + const isUsingEdge = globalThis.isEdgeRuntime ?? false; + if (isUsingEdge) { + debug("Headers has not been set, we must be in the edge runtime"); + const stream = streamCreator.writeHeaders({ + statusCode: response.statusCode, + headers: response.headers as Record, + cookies: [], + }); + Readable.fromWeb(response.body).pipe(stream); + } + + // return converter.convertTo(response); }, ); diff --git a/packages/open-next/src/wrappers/aws-lambda.ts b/packages/open-next/src/wrappers/aws-lambda.ts index 8e5785d5a..3477e9ff0 100644 --- a/packages/open-next/src/wrappers/aws-lambda.ts +++ b/packages/open-next/src/wrappers/aws-lambda.ts @@ -6,8 +6,6 @@ import type { CloudFrontRequestEvent, CloudFrontRequestResult, } from "aws-lambda"; -import { StreamCreator } from "http/openNextResponse"; -import { Writable } from "stream"; import type { WrapperHandler } from "types/open-next"; import { WarmerEvent, WarmerResponse } from "../adapters/warmer-function"; @@ -42,25 +40,7 @@ const handler: WrapperHandler = const internalEvent = await converter.convertFrom(event); - //TODO: create a simple reproduction and open an issue in the node repo - //This is a workaround, there is an issue in node that causes node to crash silently if the OpenNextNodeResponse stream is not consumed - //This does not happen everytime, it's probably caused by suspended component in ssr (either via or loading.tsx) - //Everyone that wish to create their own wrapper without a StreamCreator should implement this workaround - //This is not necessary if the underlying handler does not use OpenNextNodeResponse (At the moment, OpenNextNodeResponse is used by the node runtime servers and the image server) - const fakeStream: StreamCreator = { - writeHeaders: () => { - return new Writable({ - write: (_chunk, _encoding, callback) => { - callback(); - }, - }); - }, - onFinish: () => { - // Do nothing - }, - }; - - const response = await handler(internalEvent, fakeStream); + const response = await handler(internalEvent); return converter.convertTo(response, event); };