Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lucky-dots-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"open-next": minor
---

Replace InternalResult body from string to ReadableStream
22 changes: 12 additions & 10 deletions packages/open-next/src/adapters/edge-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { InternalEvent, InternalResult } from "types/open-next";
import type { ReadableStream } from "node:stream/web";

import type { InternalEvent, InternalResult } from "types/open-next";
import { emptyReadableStream } from "utils/stream";

// We import it like that so that the edge plugin can replace it
import { NextConfig } from "../adapters/config";
Expand All @@ -8,11 +11,14 @@ import {
convertToQueryString,
} from "../core/routing/util";

declare global {
var isEdgeRuntime: true;
}

const defaultHandler = async (
internalEvent: InternalEvent,
): Promise<InternalResult> => {
// TODO: We need to handle splitted function here
// We should probably create an host resolver to redirect correctly
globalThis.isEdgeRuntime = true;

const host = internalEvent.headers.host
? `https://${internalEvent.headers.host}`
Expand All @@ -35,10 +41,6 @@ const defaultHandler = async (
url,
body: convertBodyToReadableStream(internalEvent.method, internalEvent.body),
});

const arrayBuffer = await response.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);

const responseHeaders: Record<string, string | string[]> = {};
response.headers.forEach((value, key) => {
if (key.toLowerCase() === "set-cookie") {
Expand All @@ -49,9 +51,9 @@ const defaultHandler = async (
responseHeaders[key] = value;
}
});
// console.log("responseHeaders", responseHeaders);
const body = buffer.toString();
// console.log("body", body);

const body =
(response.body as ReadableStream<Uint8Array>) ?? emptyReadableStream();

return {
type: "core",
Expand Down
9 changes: 5 additions & 4 deletions packages/open-next/src/adapters/image-optimization-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
// @ts-ignore
import type { NextUrlWithParsedQuery } from "next/dist/server/request-meta";
import { InternalEvent, InternalResult } from "types/open-next.js";
import { emptyReadableStream, toReadableStream } from "utils/stream.js";

import { createGenericHandler } from "../core/createGenericHandler.js";
import { resolveImageLoader } from "../core/resolve.js";
Expand Down Expand Up @@ -82,7 +83,7 @@ export async function defaultHandler(
return {
statusCode: 304,
headers: {},
body: "",
body: emptyReadableStream(),
isBase64Encoded: false,
type: "core",
};
Expand Down Expand Up @@ -169,7 +170,7 @@ function buildSuccessResponse(
return {
type: "core",
statusCode: 200,
body: result.buffer.toString("base64"),
body: toReadableStream(result.buffer, true),
isBase64Encoded: true,
headers,
};
Expand All @@ -191,7 +192,7 @@ function buildFailureResponse(
"Cache-Control": `public,max-age=60,immutable`,
"Content-Type": "application/json",
});
response.end(e?.message || e?.toString() || e);
response.end(e?.message || e?.toString() || "An error occurred");
}
return {
type: "core",
Expand All @@ -203,7 +204,7 @@ function buildFailureResponse(
"Cache-Control": `public,max-age=60,immutable`,
"Content-Type": "application/json",
},
body: e?.message || e?.toString() || e,
body: toReadableStream(e?.message || e?.toString() || "An error occurred"),
};
}

Expand Down
9 changes: 6 additions & 3 deletions packages/open-next/src/converters/aws-apigw-v1.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import { removeUndefinedFromQuery } from "./utils";
Expand Down Expand Up @@ -74,9 +75,9 @@ async function convertFromAPIGatewayProxyEvent(
};
}

function convertToApiGatewayProxyResult(
async function convertToApiGatewayProxyResult(
result: InternalResult,
): APIGatewayProxyResult {
): Promise<APIGatewayProxyResult> {
const headers: Record<string, string> = {};
const multiValueHeaders: Record<string, string[]> = {};
Object.entries(result.headers).forEach(([key, value]) => {
Expand All @@ -91,10 +92,12 @@ function convertToApiGatewayProxyResult(
}
});

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: APIGatewayProxyResult = {
statusCode: result.statusCode,
headers,
body: result.body,
body,
isBase64Encoded: result.isBase64Encoded,
multiValueHeaders,
};
Expand Down
9 changes: 6 additions & 3 deletions packages/open-next/src/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda";
import { parseCookies } from "http/util";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import { convertToQuery } from "../core/routing/util";
Expand Down Expand Up @@ -85,9 +86,9 @@ async function convertFromAPIGatewayProxyEventV2(
};
}

function convertToApiGatewayProxyResultV2(
async function convertToApiGatewayProxyResultV2(
result: InternalResult,
): APIGatewayProxyResultV2 {
): Promise<APIGatewayProxyResultV2> {
const headers: Record<string, string> = {};
Object.entries(result.headers)
.filter(
Expand All @@ -104,11 +105,13 @@ function convertToApiGatewayProxyResultV2(
headers[key] = Array.isArray(value) ? value.join(", ") : `${value}`;
});

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: APIGatewayProxyResultV2 = {
statusCode: result.statusCode,
headers,
cookies: parseCookies(result.headers["set-cookie"]),
body: result.body,
body,
isBase64Encoded: result.isBase64Encoded,
};
debug(response);
Expand Down
11 changes: 9 additions & 2 deletions packages/open-next/src/converters/aws-cloudfront.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import { OutgoingHttpHeader } from "http";
import { parseCookies } from "http/util";
import type { Converter, InternalEvent, InternalResult } from "types/open-next";
import { fromReadableStream } from "utils/stream";

import { debug } from "../adapters/logger";
import {
Expand Down Expand Up @@ -159,14 +160,18 @@ async function convertToCloudFrontRequestResult(
const serverResponse = createServerResponse(result.internalEvent, {});
await proxyRequest(result.internalEvent, serverResponse);
const externalResult = convertRes(serverResponse);
const body = await fromReadableStream(
externalResult.body,
externalResult.isBase64Encoded,
);
const cloudfrontResult = {
status: externalResult.statusCode.toString(),
statusDescription: "OK",
headers: convertToCloudfrontHeaders(externalResult.headers, true),
bodyEncoding: externalResult.isBase64Encoded
? ("base64" as const)
: ("text" as const),
body: externalResult.body,
body,
};
debug("externalResult", cloudfrontResult);
return cloudfrontResult;
Expand Down Expand Up @@ -208,12 +213,14 @@ async function convertToCloudFrontRequestResult(
return response;
}

const body = await fromReadableStream(result.body, result.isBase64Encoded);

const response: CloudFrontRequestResult = {
status: result.statusCode.toString(),
statusDescription: "OK",
headers: convertToCloudfrontHeaders(responseHeaders, true),
bodyEncoding: result.isBase64Encoded ? "base64" : "text",
body: result.body,
body,
};
debug(response);
return response;
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/converters/edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ const converter: Converter<
for (const [key, value] of Object.entries(result.headers)) {
headers.set(key, Array.isArray(value) ? value.join(",") : value);
}
return new Response(result.body, {
return new Response(result.body as ReadableStream, {
status: result.statusCode,
headers: headers,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/converters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const converter: Converter = {
};
},
// Nothing to do here, it's streaming
convertTo: (internalResult: InternalResult) => ({
convertTo: async (internalResult: InternalResult) => ({
body: internalResult.body,
headers: internalResult.headers,
statusCode: internalResult.statusCode,
Expand Down
23 changes: 17 additions & 6 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "http/index.js";
import { InternalEvent, InternalResult } from "types/open-next";
import { DetachedPromiseRunner } from "utils/promise";
import { fromReadableStream } from "utils/stream";

import { debug, error, warn } from "../adapters/logger";
import { convertRes, createServerResponse, proxyRequest } from "./routing/util";
Expand Down Expand Up @@ -63,12 +64,22 @@ export async function openNextHandler(
}, {});

if ("type" in preprocessResult) {
// res is used only in the streaming case
const res = createServerResponse(internalEvent, headers, responseStreaming);
res.statusCode = preprocessResult.statusCode;
res.flushHeaders();
res.write(preprocessResult.body);
res.end();
// // res is used only in the streaming case
if (responseStreaming) {
const res = createServerResponse(
internalEvent,
headers,
responseStreaming,
);
res.statusCode = preprocessResult.statusCode;
res.flushHeaders();
const body = await fromReadableStream(
preprocessResult.body,
preprocessResult.isBase64Encoded,
);
res.write(body);
res.end();
}
return preprocessResult;
} else {
const preprocessedEvent = preprocessResult.internalEvent;
Expand Down
14 changes: 9 additions & 5 deletions packages/open-next/src/core/routing/matcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
RouteHas,
} from "types/next-types";
import { InternalEvent, InternalResult } from "types/open-next";
import { emptyReadableStream, toReadableStream } from "utils/stream";

import { debug } from "../../adapters/logger";
import { localizePath } from "./i18n";
Expand Down Expand Up @@ -243,8 +244,11 @@ export function handleRewrites<T extends RewriteDefinition>(
};
}

function handleTrailingSlashRedirect(event: InternalEvent) {
function handleTrailingSlashRedirect(
event: InternalEvent,
): false | InternalResult {
const url = new URL(event.url, "http://localhost");
const emptyBody = emptyReadableStream();

if (
// Someone is trying to redirect to a different origin, let's not do that
Expand All @@ -270,7 +274,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) {
headersLocation[1] ? `?${headersLocation[1]}` : ""
}`,
},
body: "",
body: emptyBody,
isBase64Encoded: false,
};
// eslint-disable-next-line sonarjs/elseif-without-else
Expand All @@ -288,7 +292,7 @@ function handleTrailingSlashRedirect(event: InternalEvent) {
headersLocation[1] ? `?${headersLocation[1]}` : ""
}`,
},
body: "",
body: emptyBody,
isBase64Encoded: false,
};
} else return false;
Expand All @@ -311,7 +315,7 @@ export function handleRedirects(
headers: {
Location: internalEvent.url,
},
body: "",
body: emptyReadableStream(),
isBase64Encoded: false,
};
}
Expand All @@ -328,7 +332,7 @@ export function fixDataPage(
return {
type: internalEvent.type,
statusCode: 404,
body: "{}",
body: toReadableStream("{}"),
headers: {
"Content-Type": "application/json",
},
Expand Down
11 changes: 6 additions & 5 deletions packages/open-next/src/core/routing/middleware.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { ReadableStream } from "node:stream/web";

import { MiddlewareManifest, NextConfig } from "config/index.js";
import { InternalEvent, InternalResult } from "types/open-next.js";
import { emptyReadableStream } from "utils/stream.js";

//NOTE: we should try to avoid importing stuff from next as much as possible
// every release of next could break this
Expand Down Expand Up @@ -137,7 +140,7 @@ export async function handleMiddleware(
) ?? resHeaders.location;
// res.setHeader("Location", location);
return {
body: "",
body: emptyReadableStream(),
type: internalEvent.type,
statusCode: statusCode,
headers: resHeaders,
Expand Down Expand Up @@ -182,16 +185,14 @@ export async function handleMiddleware(
// the body immediately to the client.
if (result.body) {
// transfer response body to res
const arrayBuffer = await result.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
// res.end(buffer);
const body = result.body as ReadableStream<Uint8Array>;

// await pipeReadable(result.response.body, res);
return {
type: internalEvent.type,
statusCode: statusCode,
headers: resHeaders,
body: buffer.toString(),
body,
isBase64Encoded: false,
};
}
Expand Down
9 changes: 5 additions & 4 deletions packages/open-next/src/core/routing/util.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import crypto from "node:crypto";
import { OutgoingHttpHeaders } from "node:http";
import { Readable } from "node:stream";

import { BuildId, HtmlPages } from "config/index.js";
import type { IncomingMessage, StreamCreator } from "http/index.js";
import { OpenNextNodeResponse } from "http/openNextResponse.js";
import { parseHeaders } from "http/util.js";
import type { MiddlewareManifest } from "types/next-types";
import { InternalEvent } from "types/open-next.js";
import { InternalEvent, InternalResult } from "types/open-next.js";

import { isBinaryContentType } from "../../adapters/binary.js";
import { debug, error } from "../../adapters/logger.js";
Expand Down Expand Up @@ -69,7 +70,7 @@ export function getUrlParts(url: string, isExternal: boolean) {
*
* @__PURE__
*/
export function convertRes(res: OpenNextNodeResponse) {
export function convertRes(res: OpenNextNodeResponse): InternalResult {
// Format Next.js response to Lambda response
const statusCode = res.statusCode || 200;
// When using HEAD requests, it seems that flushHeaders is not called, not sure why
Expand All @@ -80,9 +81,9 @@ export function convertRes(res: OpenNextNodeResponse) {
? headers["content-type"][0]
: headers["content-type"],
);
const encoding = isBase64Encoded ? "base64" : "utf8";
const body = res.getBody().toString(encoding);
const body = Readable.toWeb(res);
return {
type: "core",
statusCode,
headers,
body,
Expand Down
Loading