Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/open-next/src/core/routing/cacheInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function createPprPartialResult(
"next-resume": "1",
},
rawPath: localizedPath,
body: Buffer.from(cachedValue.meta?.postponed || "", "utf-8"),
body: toReadableStream(cachedValue.meta?.postponed || ""),
},
result: {
type: "core",
Expand Down
6 changes: 4 additions & 2 deletions packages/open-next/src/core/routing/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ export async function handleMiddleware(

const middleware = await middlewareLoader();

const [bodyForMiddleware, bodyForForward] = internalEvent.body?.tee() ?? [undefined, undefined];

const result: Response = await middleware.default({
// `geo` is pre Next 15.
geo: {
Expand All @@ -84,7 +86,7 @@ export async function handleMiddleware(
trailingSlash: NextConfig.trailingSlash,
},
url,
body: convertBodyToReadableStream(internalEvent.method, internalEvent.body),
body: convertBodyToReadableStream(internalEvent.method, bodyForMiddleware),
} as unknown as Request);
const statusCode = result.status;

Expand Down Expand Up @@ -175,7 +177,7 @@ export async function handleMiddleware(
rawPath: new URL(newUrl).pathname,
type: internalEvent.type,
headers: { ...internalEvent.headers, ...reqHeaders },
body: internalEvent.body,
body: bodyForForward,
method: internalEvent.method,
query: middlewareQuery,
cookies: internalEvent.cookies,
Expand Down
9 changes: 2 additions & 7 deletions packages/open-next/src/core/routing/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,10 @@ export function unescapeRegex(str: string) {
/**
* @__PURE__
*/
export function convertBodyToReadableStream(method: string, body?: string | Buffer) {
export function convertBodyToReadableStream(method: string, body?: ReadableStream) {
if (method === "GET" || method === "HEAD") return undefined;
if (!body) return undefined;
return new ReadableStream({
start(controller) {
controller.enqueue(body);
controller.close();
},
});
return body;
}

enum CommonHeaders {
Expand Down
61 changes: 50 additions & 11 deletions packages/open-next/src/http/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

// @ts-nocheck
import http from "node:http";
import type { ReadableStream } from "node:stream/web";

export class IncomingMessage extends http.IncomingMessage {
constructor({
Expand All @@ -16,7 +17,7 @@ export class IncomingMessage extends http.IncomingMessage {
method: string;
url: string;
headers: Record<string, string | string[]>;
body?: Buffer;
body?: ReadableStream;
remoteAddress?: string;
}) {
super({
Expand All @@ -28,12 +29,6 @@ export class IncomingMessage extends http.IncomingMessage {
destroy: Function.prototype,
});

// Set the content length when there is a body.
// See https://httpwg.org/specs/rfc9110.html#field.content-length
if (body) {
headers["content-length"] ??= String(Buffer.byteLength(body));
}

Object.assign(this, {
ip: remoteAddress,
complete: true,
Expand All @@ -46,9 +41,53 @@ export class IncomingMessage extends http.IncomingMessage {
url,
});

this._read = () => {
this.push(body);
this.push(null);
};
this._read = (() => {
if (!body) {
return () => {
this.push(null);
};
}
const reader = body.getReader();
let reading = false;
let streamDone = false;

this.once("close", () => {
if (!streamDone) {
streamDone = true;
reader.cancel().catch(() => {});
}
});

const pump = () => {
reading = true;
reader
.read()
.then(({ done, value }) => {
if (done) {
streamDone = true;
reader.releaseLock();
this.push(null);
} else {
const canContinue = this.push(value);
if (canContinue) {
pump();
} else {
reading = false;
}
}
})
.catch((err) => {
streamDone = true;
reader.cancel().catch(() => {});
this.destroy(err);
});
};

return () => {
if (!reading) {
pump();
}
};
})();
}
}
5 changes: 2 additions & 3 deletions packages/open-next/src/overrides/converters/aws-apigw-v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda";

import type { InternalEvent, InternalResult } from "@/types/open-next";
import type { Converter } from "@/types/overrides";
import { fromReadableStream } from "@/utils/stream";
import { fromReadableStream, toReadableStream } from "@/utils/stream";

import { debug } from "../../adapters/logger";

import { extractHostFromHeaders, removeUndefinedFromQuery } from "./utils";

function normalizeAPIGatewayProxyEventHeaders(event: APIGatewayProxyEvent): Record<string, string> {
event.multiValueHeaders;
const headers: Record<string, string> = {};

for (const [key, values] of Object.entries(event.multiValueHeaders || {})) {
Expand Down Expand Up @@ -73,7 +72,7 @@ async function convertFromAPIGatewayProxyEvent(event: APIGatewayProxyEvent): Pro
method: httpMethod,
rawPath: path,
url: `https://${extractHostFromHeaders(headers)}${path}${normalizeAPIGatewayProxyEventQueryParams(event)}`,
body: Buffer.from(body ?? "", isBase64Encoded ? "base64" : "utf8"),
body: body ? toReadableStream(body, isBase64Encoded) : undefined,
headers,
remoteAddress: requestContext.identity.sourceIp,
query: removeUndefinedFromQuery(normalizeAPIGatewayProxyEventMultiValueQueryStringParameters(event)),
Expand Down
14 changes: 8 additions & 6 deletions packages/open-next/src/overrides/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { ReadableStream } from "node:stream/web";

import type { APIGatewayProxyEventV2, APIGatewayProxyResultV2 } from "aws-lambda";

import { parseSetCookieHeader } from "@/http/util";
import type { InternalEvent, InternalResult } from "@/types/open-next";
import type { Converter } from "@/types/overrides";
import { fromReadableStream } from "@/utils/stream";
import { fromReadableStream, toReadableStream } from "@/utils/stream";

import { debug } from "../../adapters/logger";
import { convertToQuery } from "../../core/routing/util";
Expand Down Expand Up @@ -36,18 +38,18 @@ const CloudFrontBlacklistedHeaders = [
"via",
];

function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): Buffer {
function normalizeAPIGatewayProxyEventV2Body(event: APIGatewayProxyEventV2): ReadableStream | undefined {
const { body, isBase64Encoded } = event;
if (Buffer.isBuffer(body)) {
return body;
return toReadableStream(body);
}
if (typeof body === "string") {
return Buffer.from(body, isBase64Encoded ? "base64" : "utf8");
return toReadableStream(body, isBase64Encoded);
}
if (typeof body === "object") {
return Buffer.from(JSON.stringify(body));
return toReadableStream(JSON.stringify(body));
}
return Buffer.from("", "utf8");
return undefined;
}

function normalizeAPIGatewayProxyEventV2Headers(event: APIGatewayProxyEventV2): Record<string, string> {
Expand Down
4 changes: 2 additions & 2 deletions packages/open-next/src/overrides/converters/aws-cloudfront.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
import { parseSetCookieHeader } from "@/http/util";
import type { InternalEvent, InternalResult, MiddlewareResult } from "@/types/open-next";
import type { Converter } from "@/types/overrides";
import { fromReadableStream } from "@/utils/stream";
import { fromReadableStream, toReadableStream } from "@/utils/stream";

import { debug } from "../../adapters/logger";
import { convertToQuery, convertToQueryString } from "../../core/routing/util";
Expand Down Expand Up @@ -85,7 +85,7 @@ async function convertFromCloudFrontRequestEvent(event: CloudFrontRequestEvent):
method,
rawPath: uri,
url: `https://${extractHostFromHeaders(headers)}${uri}${querystring ? `?${querystring}` : ""}`,
body: Buffer.from(body?.data ?? "", body?.encoding === "base64" ? "base64" : "utf8"),
body: body?.data ? toReadableStream(body.data, body.encoding === "base64") : undefined,
headers,
remoteAddress: clientIp,
query: convertToQuery(querystring),
Expand Down
8 changes: 5 additions & 3 deletions packages/open-next/src/overrides/converters/edge.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Buffer } from "node:buffer";
import type { ReadableStream } from "node:stream/web";

import cookieParser from "cookie";

Expand Down Expand Up @@ -32,7 +32,7 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
const shouldHaveBody = method !== "GET" && method !== "HEAD";

// Only read body for methods that should have one
const body = shouldHaveBody ? Buffer.from(await request.arrayBuffer()) : undefined;
const body = shouldHaveBody ? ((request.body as ReadableStream | undefined) ?? undefined) : undefined;

const cookieHeader = request.headers.get("cookie");
const cookies = cookieHeader ? (cookieParser.parse(cookieHeader) as Record<string, string>) : {};
Expand Down Expand Up @@ -106,7 +106,9 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
}

// We should not return a body for statusCode's that doesn't allow bodies
const body = NULL_BODY_STATUSES.has(result.statusCode) ? null : (result.body as ReadableStream);
const body = NULL_BODY_STATUSES.has(result.statusCode)
? null
: (result.body as unknown as globalThis.ReadableStream);

return new Response(body, {
status: result.statusCode,
Expand Down
13 changes: 4 additions & 9 deletions packages/open-next/src/overrides/converters/node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { IncomingMessage } from "node:http";
import { Readable } from "node:stream";
import type { ReadableStream } from "node:stream/web";

import cookieParser from "cookie";

Expand All @@ -10,15 +12,8 @@ import { extractHostFromHeaders, getQueryFromSearchParams } from "./utils.js";
const converter: Converter = {
convertFrom: async (event: unknown) => {
const req = event as IncomingMessage & { protocol?: string };
const body = await new Promise<Buffer>((resolve) => {
const chunks: Uint8Array[] = [];
req.on("data", (chunk) => {
chunks.push(chunk);
});
req.on("end", () => {
resolve(Buffer.concat(chunks));
});
});
const shouldHaveBody = req.method !== "GET" && req.method !== "HEAD";
const body: ReadableStream | undefined = shouldHaveBody ? Readable.toWeb(req) : undefined;

const headers = Object.fromEntries(
Object.entries(req.headers ?? {})
Expand Down
23 changes: 20 additions & 3 deletions packages/open-next/src/overrides/proxyExternalRequest/node.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { request } from "node:https";
import { Readable } from "node:stream";
import type { ReadableStream } from "node:stream/web";

import type { InternalEvent, InternalResult } from "@/types/open-next";
import type { ProxyExternalRequest } from "@/types/overrides";
Expand Down Expand Up @@ -35,6 +36,15 @@ const nodeProxy: ProxyExternalRequest = {
const { url, headers, method, body } = internalEvent;
debug("proxyRequest", url);
return new Promise<InternalResult>((resolve, reject) => {
let hasRejected = false;
const rejectOnce = (e: Error) => {
if (hasRejected) {
return;
}

hasRejected = true;
reject(e);
};
const filteredHeaders = filterHeadersForProxy(headers);
debug("filteredHeaders", filteredHeaders);
const req = request(
Expand Down Expand Up @@ -67,15 +77,22 @@ const nodeProxy: ProxyExternalRequest = {

_res.on("error", (e) => {
error("proxyRequest error", e);
reject(e);
rejectOnce(e);
});
}
);
req.on("error", (e) => {
error("proxyRequest error", e);
rejectOnce(e);
});

if (body && method !== "GET" && method !== "HEAD") {
req.write(body);
Readable.fromWeb(body as ReadableStream<Uint8Array>)
.on("error", rejectOnce)
.pipe(req);
} else {
req.end();
}
req.end();
});
Comment thread
conico974 marked this conversation as resolved.
},
};
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/types/open-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export type InternalEvent = {
readonly rawPath: string;
// Full URL - starts with "https://on/" when the host is not available
readonly url: string;
readonly body?: Buffer;
readonly body?: ReadableStream;
//TODO: change the type of headers to Record<string, string | string[]>
readonly headers: Record<string, string>;
readonly query: Record<string, string | string[]>;
Expand Down
14 changes: 8 additions & 6 deletions packages/tests-unit/tests/converters/aws-apigw-v1.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Readable } from "node:stream";

import converter from "@opennextjs/aws/overrides/converters/aws-apigw-v1.js";
import { fromReadableStream } from "@opennextjs/aws/utils/stream.js";
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda";

describe("convertTo", () => {
Expand Down Expand Up @@ -88,7 +89,7 @@ describe("convertFrom", () => {
method: "POST",
rawPath: "/",
url: "https://on/",
body: Buffer.from('{"message":"Hello, world!"}'),
body: expect.any(ReadableStream),
headers: {
"content-type": "application/json",
},
Expand Down Expand Up @@ -127,7 +128,7 @@ describe("convertFrom", () => {
method: "POST",
rawPath: "/",
url: "https://on/",
body: Buffer.from('{"message":"Hello, world!"}'),
body: expect.any(ReadableStream),
headers: {
test: "test1,test2",
},
Expand Down Expand Up @@ -168,7 +169,7 @@ describe("convertFrom", () => {
method: "POST",
rawPath: "/",
url: "https://on/?test=test",
body: Buffer.from('{"message":"Hello, world!"}'),
body: expect.any(ReadableStream),
headers: {},
remoteAddress: "::1",
query: {
Expand Down Expand Up @@ -209,7 +210,7 @@ describe("convertFrom", () => {
method: "POST",
rawPath: "/",
url: "https://on/?test=testA&test=testB",
body: Buffer.from('{"message":"Hello, world!"}'),
body: expect.any(ReadableStream),
headers: {},
remoteAddress: "::1",
query: {
Expand Down Expand Up @@ -250,7 +251,7 @@ describe("convertFrom", () => {
method: "POST",
rawPath: "/",
url: "https://on/",
body: Buffer.from('{"message":"Hello, world!"}'),
body: expect.any(ReadableStream),
headers: {
"content-type": "application/json",
cookie: "test1=1,test2=2",
Expand Down Expand Up @@ -293,13 +294,14 @@ describe("convertFrom", () => {
method: "GET",
rawPath: "/",
url: "https://on/",
body: Buffer.from("Hello, world!"),
body: expect.any(ReadableStream),
headers: {
"content-type": "application/json",
},
remoteAddress: "::1",
query: {},
cookies: {},
});
expect(await fromReadableStream(response.body!)).toEqual("Hello, world!");
});
});
Loading
Loading