Skip to content

Commit

Permalink
Stream request body instead of buffering it in memory (#8084)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthew Phillips <matthew@matthewphillips.info>
  • Loading branch information
hbgl and matthewp committed Aug 15, 2023
1 parent 3755424 commit 560e459
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 57 deletions.
6 changes: 6 additions & 0 deletions .changeset/lemon-lobsters-do.md
@@ -0,0 +1,6 @@
---
'@astrojs/node': patch
'astro': patch
---

Stream request body instead of buffering it in memory.
135 changes: 84 additions & 51 deletions packages/astro/src/core/app/node.ts
Expand Up @@ -9,84 +9,117 @@ import { App, type MatchOptions } from './index.js';

const clientAddressSymbol = Symbol.for('astro.clientAddress');

function createRequestFromNodeRequest(req: NodeIncomingMessage, body?: Uint8Array): Request {
type CreateNodeRequestOptions = {
emptyBody?: boolean;
};

type BodyProps = Partial<RequestInit>;

function createRequestFromNodeRequest(
req: NodeIncomingMessage,
options?: CreateNodeRequestOptions
): Request {
const protocol =
req.socket instanceof TLSSocket || req.headers['x-forwarded-proto'] === 'https'
? 'https'
: 'http';
const hostname = req.headers.host || req.headers[':authority'];
const url = `${protocol}://${hostname}${req.url}`;
const rawHeaders = req.headers as Record<string, any>;
const entries = Object.entries(rawHeaders);
const headers = makeRequestHeaders(req);
const method = req.method || 'GET';
let bodyProps: BodyProps = {};
const bodyAllowed = method !== 'HEAD' && method !== 'GET' && !options?.emptyBody;
if (bodyAllowed) {
bodyProps = makeRequestBody(req);
}
const request = new Request(url, {
method,
headers: new Headers(entries),
body: ['HEAD', 'GET'].includes(method) ? null : body,
headers,
...bodyProps,
});
if (req.socket?.remoteAddress) {
Reflect.set(request, clientAddressSymbol, req.socket.remoteAddress);
}
return request;
}

class NodeIncomingMessage extends IncomingMessage {
/**
* The read-only body property of the Request interface contains a ReadableStream with the body contents that have been added to the request.
*/
body?: unknown;
function makeRequestHeaders(req: NodeIncomingMessage): Headers {
const headers = new Headers();
for (const [name, value] of Object.entries(req.headers)) {
if (value === undefined) {
continue;
}
if (Array.isArray(value)) {
for (const item of value) {
headers.append(name, item);
}
} else {
headers.append(name, value);
}
}
return headers;
}

export class NodeApp extends App {
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
return super.match(req instanceof Request ? req : createRequestFromNodeRequest(req), opts);
}
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
function makeRequestBody(req: NodeIncomingMessage): BodyProps {
if (req.body !== undefined) {
if (typeof req.body === 'string' && req.body.length > 0) {
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req, Buffer.from(req.body)),
routeData,
locals
);
return { body: Buffer.from(req.body) };
}

if (typeof req.body === 'object' && req.body !== null && Object.keys(req.body).length > 0) {
return super.render(
req instanceof Request
? req
: createRequestFromNodeRequest(req, Buffer.from(JSON.stringify(req.body))),
routeData,
locals
);
return { body: Buffer.from(JSON.stringify(req.body)) };
}

if ('on' in req) {
let body = Buffer.from([]);
let reqBodyComplete = new Promise((resolve, reject) => {
req.on('data', (d) => {
body = Buffer.concat([body, d]);
});
req.on('end', () => {
resolve(body);
});
req.on('error', (err) => {
reject(err);
});
});
// This covers all async iterables including Readable and ReadableStream.
if (
typeof req.body === 'object' &&
req.body !== null &&
typeof (req.body as any)[Symbol.asyncIterator] !== 'undefined'
) {
return asyncIterableToBodyProps(req.body as AsyncIterable<any>);
}
}

// Return default body.
return asyncIterableToBodyProps(req);
}

function asyncIterableToBodyProps(iterable: AsyncIterable<any>): BodyProps {
return {
// Node uses undici for the Request implementation. Undici accepts
// a non-standard async iterable for the body.
// @ts-expect-error
body: iterable,
// The duplex property is required when using a ReadableStream or async
// iterable for the body. The type definitions do not include the duplex
// property because they are not up-to-date.
// @ts-expect-error
duplex: 'half',
} satisfies BodyProps;
}

class NodeIncomingMessage extends IncomingMessage {
/**
* Allow the request body to be explicitly overridden. For example, this
* is used by the Express JSON middleware.
*/
body?: unknown;
}

return reqBodyComplete.then(() => {
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req, body),
routeData,
locals
);
export class NodeApp extends App {
match(req: NodeIncomingMessage | Request, opts: MatchOptions = {}) {
if (!(req instanceof Request)) {
req = createRequestFromNodeRequest(req, {
emptyBody: true,
});
}
return super.render(
req instanceof Request ? req : createRequestFromNodeRequest(req),
routeData,
locals
);
return super.match(req, opts);
}
render(req: NodeIncomingMessage | Request, routeData?: RouteData, locals?: object) {
if (!(req instanceof Request)) {
req = createRequestFromNodeRequest(req);
}
return super.render(req, routeData, locals);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/integrations/node/package.json
Expand Up @@ -49,7 +49,7 @@
"chai": "^4.3.7",
"cheerio": "1.0.0-rc.12",
"mocha": "^9.2.2",
"node-mocks-http": "^1.12.2",
"node-mocks-http": "^1.13.0",
"undici": "^5.22.1"
}
}
45 changes: 42 additions & 3 deletions packages/integrations/node/test/api-route.test.js
@@ -1,6 +1,7 @@
import nodejs from '../dist/index.js';
import { loadFixture, createRequestAndResponse } from './test-utils.js';
import { expect } from 'chai';
import crypto from 'node:crypto';

describe('API routes', () => {
/** @type {import('./test-utils').Fixture} */
Expand All @@ -22,9 +23,11 @@ describe('API routes', () => {
url: '/recipes',
});

handler(req, res);
req.once('async_iterator', () => {
req.send(JSON.stringify({ id: 2 }));
});

req.send(JSON.stringify({ id: 2 }));
handler(req, res);

let [buffer] = await done;

Expand All @@ -43,11 +46,47 @@ describe('API routes', () => {
url: '/binary',
});

req.once('async_iterator', () => {
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));
});

handler(req, res);
req.send(Buffer.from(new Uint8Array([1, 2, 3, 4, 5])));

let [out] = await done;
let arr = Array.from(new Uint8Array(out.buffer));
expect(arr).to.deep.equal([5, 4, 3, 2, 1]);
});

it('Can post large binary data', async () => {
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');

let { req, res, done } = createRequestAndResponse({
method: 'POST',
url: '/hash',
});

handler(req, res);

let expectedDigest = null;
req.once('async_iterator', () => {
// Send 256MB of garbage data in 256KB chunks. This should be fast (< 1sec).
let remainingBytes = 256 * 1024 * 1024;
const chunkSize = 256 * 1024;

const hash = crypto.createHash('sha256');
while (remainingBytes > 0) {
const size = Math.min(remainingBytes, chunkSize);
const chunk = Buffer.alloc(size, Math.floor(Math.random() * 256));
hash.update(chunk);
req.emit('data', chunk);
remainingBytes -= size;
}

req.emit('end');
expectedDigest = hash.digest();
});

let [out] = await done;
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
});
});
@@ -0,0 +1,16 @@
import crypto from 'node:crypto';

export async function post({ request }: { request: Request }) {
const hash = crypto.createHash('sha256');

const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
for await (const chunk of iterable) {
hash.update(chunk);
}

return new Response(hash.digest(), {
headers: {
'Content-Type': 'application/octet-stream'
}
});
}
20 changes: 18 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 560e459

Please sign in to comment.