diff --git a/packages/next/src/server/stream-utils/index.d.ts b/packages/next/src/server/stream-utils/index.d.ts index 3b3b1b4d8da5a..1ded12e019455 100644 --- a/packages/next/src/server/stream-utils/index.d.ts +++ b/packages/next/src/server/stream-utils/index.d.ts @@ -18,3 +18,8 @@ export function renderToString(element: React.ReactElement): Promise export function streamToString( stream: Readable | ReadableStream ): Promise + +export function chainStreams( + ...streams: ReadableStream[] +): ReadableStream +export function chainStreams(...streams: Readable[]): Readable diff --git a/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx new file mode 100644 index 0000000000000..3a88567e411db --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.edge.test.tsx @@ -0,0 +1,45 @@ +import { createBufferedTransformStream } from './stream-utils.edge' +import { renderToReadableStream } from 'react-dom/server.edge' +import { Suspense } from 'react' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +async function createInput(app = ): Promise> { + const stream = await renderToReadableStream(app) + await stream.allReady + return stream +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => { + const input = await createInput() + const actualStream = input.pipeThrough(createBufferedTransformStream()) + + const actualChunks = [] + // @ts-expect-error + for await (const chunks of actualStream) { + actualChunks.push(chunks) + } + + expect(actualChunks.length).toBe(1) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.test.tsx b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx new file mode 100644 index 0000000000000..833064aea3ef4 --- /dev/null +++ b/packages/next/src/server/stream-utils/stream-utils.node.test.tsx @@ -0,0 +1,92 @@ +import { + createBufferedTransformStream, + createInsertedHTMLStream, +} from './stream-utils.node' +import { PassThrough } from 'node:stream' +import { renderToPipeableStream } from 'react-dom/server.node' +import { Suspense } from 'react' + +function App() { + const Data = async () => { + const data = await Promise.resolve('1') + return

{data}

+ } + + return ( + + + My App + + +

Hello, World!

+ Fallback}> + + + + + ) +} + +function createInput(app = ): Promise { + return new Promise((resolve, reject) => { + const { pipe } = renderToPipeableStream(app, { + onAllReady() { + resolve(pipe(new PassThrough())) + }, + onShellError(error) { + reject(error) + }, + }) + }) +} + +describe('createBufferedTransformStream', () => { + it('should return a TransformStream that buffers input chunks across rendering boundaries', (done) => { + createInput().then((input) => { + const stream = input.pipe(createBufferedTransformStream()) + const actualChunks = [] + stream.on('data', (chunk) => { + actualChunks.push(chunk) + }) + + stream.resume() + + stream.on('finish', () => { + expect(actualChunks.length).toBe(1) + done() + }) + }) + }) +}) + +describe('createInsertedHTMLStream', () => { + it('should insert html to the beginning of the stream', async () => { + const insertedHTML = '' + const stream = createInsertedHTMLStream(() => Promise.resolve(insertedHTML)) + const input = await createInput() + const output = input.pipe(stream) + + const actualChunks = await new Promise((resolve) => { + const chunks: Buffer[] = [] + output.on('readable', () => { + let chunk + while (null !== (chunk = output.read())) { + chunks.push(chunk) + } + }) + output.on('end', () => { + resolve(chunks) + }) + }) + + console.log(actualChunks) + + expect(actualChunks.length).toBe(2) + const encoder = new TextEncoder() + const expected = encoder.encode(insertedHTML) + expect(actualChunks[0].indexOf(expected)).toBe(0) + expect( + new Uint8Array(actualChunks[0].subarray(expected.length)) + ).toStrictEqual(expected) + }) +}) diff --git a/packages/next/src/server/stream-utils/stream-utils.node.ts b/packages/next/src/server/stream-utils/stream-utils.node.ts index d3ee4e9d0fc33..4bda164c692e9 100644 --- a/packages/next/src/server/stream-utils/stream-utils.node.ts +++ b/packages/next/src/server/stream-utils/stream-utils.node.ts @@ -2,9 +2,18 @@ * By default, this file exports the methods from streams-utils.edge since all of those are based on Node.js web streams. * This file will then be an incremental re-implementation of all of those methods into Node.js only versions (based on proper Node.js Streams). */ - -import { PassThrough, type Readable, Writable } from 'node:stream' +import { + PassThrough, + Readable, + Transform, + Writable, + pipeline, +} from 'node:stream' import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node' +import { DetachedPromise } from '../../lib/detached-promise' +import { indexOfUint8Array } from './uint8array-helpers' +import { ENCODED_TAGS } from './encodedTags' + export * from './stream-utils.edge' @@ -76,3 +85,168 @@ export async function streamToString(stream: Readable) { return string } + +export function chainStreams(...streams: Readable[]): Readable { + if (streams.length === 0) { + throw new Error('Invariant: chainStreams requires at least one stream') + } + if (streams.length === 1) { + return streams[0] + } + + const transform = new Transform() + + pipeline(streams, transform, (err) => { + // to match `stream-utils.edge.ts`, this error is just ignored. + // but maybe we at least log it? + console.log(`Invariant: error when pipelining streams`) + console.error(err) + }) + + return transform +} + +export function streamFromString(string: string): Readable { + return Readable.from(string) +} + +export function createBufferedTransformStream(): Transform { + let buffered: Uint8Array[] = [] + let byteLength = 0 + let pending: DetachedPromise | undefined + + const flush = (transform: Transform) => { + if (pending) return + + const detached = new DetachedPromise() + pending = detached + + setImmediate(() => { + try { + const chunk = new Uint8Array(byteLength) + let copiedBytes = 0 + for (let i = 0; i < buffered.length; i++) { + chunk.set(buffered[i], copiedBytes) + copiedBytes += buffered[i].byteLength + } + buffered.length = 0 + byteLength = 0 + transform.push(chunk) + } catch { + } finally { + pending = undefined + detached.resolve() + } + }) + } + + return new Transform({ + transform(chunk, _, callback) { + buffered.push(chunk) + byteLength += chunk.byteLength + flush(this) + callback() + }, + final(callback) { + if (!pending) callback() + + pending?.promise.then(() => callback()) + }, + }) +} + +const encoder = new TextEncoder() + +export function createInsertedHTMLStream( + getServerInsertedHTML: () => Promise +): Transform { + return new Transform({ + transform(chunk, _, callback) { + getServerInsertedHTML() + .then((html) => { + if (html) { + this.push(encoder.encode(html)) + } + + return callback(null, chunk) + }) + .catch((err) => { + return callback(err) + }) + }, + }) +} + +export function createHeadInsertionTransformStream( + insert: () => Promise +): Transform { + let inserted = false + let freezing = false + let hasBytes = false + return new Transform({ + transform(chunk, _, callback) { + hasBytes = true + if (freezing) { + return callback(null, chunk) + } + insert() + .then((insertion) => { + if (inserted) { + if (insertion) { + this.push(encoder.encode(insertion)) + } + this.push(chunk) + freezing = true + } else { + const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD) + if (index !== -1) { + if (insertion) { + const encodedInsertion = encoder.encode(insertion) + const insertedHeadContent = new Uint8Array( + chunk.length + encodedInsertion.length + ) + insertedHeadContent.set(chunk.slice(0, index)) + insertedHeadContent.set(encodedInsertion, index) + insertedHeadContent.set( + chunk.slice(index), + index + encodedInsertion.length + ) + this.push(insertedHeadContent) + } else { + this.push(chunk) + } + freezing = true + inserted = true + } + } + + if (!inserted) { + this.push(chunk) + } else { + process.nextTick(() => { + freezing = false + }) + } + + callback() + }) + .catch((err) => { + callback(err) + }) + }, + final(callback) { + if (hasBytes) { + insert() + .then((insertion) => { + if (insertion) { + this.push(encoder.encode(insertion)) + callback() + } + }) + .catch((err) => { + callback(err) + }) + } + }, + }) +}