Skip to content

Commit

Permalink
Implement Node.js Stream support (#65704)
Browse files Browse the repository at this point in the history
WIP - 1:1 implementation of stream helpers utility file to support
Node.js streams. Uses multiplex pattern. Also includes minor testing
where necessary.

---------

Co-authored-by: Josh Story <story@hey.com>
  • Loading branch information
Ethan-Arrowood and gnoff committed Jun 19, 2024
1 parent 67ed8e4 commit f846f7c
Show file tree
Hide file tree
Showing 25 changed files with 1,359 additions and 137 deletions.
5 changes: 5 additions & 0 deletions packages/next/src/build/create-compiler-aliases.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ export function createRSCAliases(
'react-dom/server.browser$': `next/dist/build/webpack/alias/react-dom-server-browser${bundledReactChannel}.js`,
'react-dom/server.node$': `next/dist/build/webpack/alias/react-dom-server-browser${bundledReactChannel}.js`,
// react-server-dom-webpack alias
'react-server-dom-webpack/client$': `next/dist/compiled/react-server-dom-webpack${bundledReactChannel}/client`,
'react-server-dom-webpack/client.edge$': `next/dist/compiled/react-server-dom-webpack${bundledReactChannel}/client.edge`,
'react-server-dom-webpack/client.node$': `next/dist/compiled/react-server-dom-webpack${bundledReactChannel}/client.node`,
'react-server-dom-webpack/server.edge$': `next/dist/compiled/react-server-dom-webpack${bundledReactChannel}/server.edge`,
'react-server-dom-webpack/server.node$': `next/dist/compiled/react-server-dom-webpack${bundledReactChannel}/server.node`,
...createRSCRendererAliases(bundledReactChannel),
}

Expand Down
8 changes: 4 additions & 4 deletions packages/next/src/server/app-render/action-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ function limitUntrustedHeaderValueForLogs(value: string) {
return value.length > 100 ? value.slice(0, 100) + '...' : value
}

type ServerModuleMap = Record<
export type ServerModuleMap = Record<
string,
| {
id: string
Expand Down Expand Up @@ -609,7 +609,7 @@ export async function handleAction({
// TODO-APP: Add streaming support
const formData = await req.request.formData()
if (isFetchAction) {
bound = await decodeReply(formData, serverModuleMap)
bound = await decodeReply<any[]>(formData, serverModuleMap)
} else {
const action = await decodeAction(formData, serverModuleMap)
if (typeof action === 'function') {
Expand Down Expand Up @@ -648,9 +648,9 @@ export async function handleAction({

if (isURLEncodedAction) {
const formData = formDataFromSearchQueryString(actionData)
bound = await decodeReply(formData, serverModuleMap)
bound = await decodeReply<any[]>(formData, serverModuleMap)
} else {
bound = await decodeReply(actionData, serverModuleMap)
bound = await decodeReply<any[]>(actionData, serverModuleMap)
}
}
} else if (
Expand Down
150 changes: 97 additions & 53 deletions packages/next/src/server/app-render/app-render.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ import { isNodeNextRequest } from '../base-http/helpers'
import { HeadersAdapter } from '../web/spec-extension/adapters/headers'
import { parseParameter } from '../../shared/lib/router/utils/route-regex'
import { parseRelativeUrl } from '../../shared/lib/router/utils/parse-relative-url'
import type { Readable } from 'stream'

export type GetDynamicParamFromSegment = (
// [slug] / [[slug]] / [...slug]
Expand Down Expand Up @@ -315,7 +316,7 @@ async function generateFlight(
const {
componentMod: {
tree: loaderTree,
renderToReadableStream,
renderToStream,
createDynamicallyTrackedSearchParams,
},
getDynamicParamFromSegment,
Expand Down Expand Up @@ -362,18 +363,35 @@ async function generateFlight(

// For app dir, use the bundled version of Flight server renderer (renderToReadableStream)
// which contains the subset React.
const flightReadableStream = renderToReadableStream(
const flightReadableStream = renderToStream(
options
? [options.actionResult, buildIdFlightDataPair]
: buildIdFlightDataPair,
ctx.clientReferenceManifest.clientModules,
{
onError: ctx.flightDataRendererErrorHandler,
// @ts-expect-error This `renderToStream` wraps the `renderToReadableStream` or `renderToPipeableStream` from `react-server-dom-webpack` which doesn't specify a `nonce` prop on either options object. Leaving it in in case some other method is being used here.
nonce: ctx.nonce,
}
)

return new FlightRenderResult(flightReadableStream)
let resultStream: Readable | ReadableStream<Uint8Array>
if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(flightReadableStream instanceof ReadableStream)
) {
const { PassThrough } =
require('node:stream') as typeof import('node:stream')
resultStream = flightReadableStream.pipe(new PassThrough())
} else if (!(flightReadableStream instanceof ReadableStream)) {
throw new Error(
'Invariant. Stream is not a ReadableStream in non-Node.js runtime.'
)
} else {
resultStream = flightReadableStream
}

return new FlightRenderResult(resultStream)
}

type RenderToStreamResult = {
Expand Down Expand Up @@ -598,13 +616,13 @@ function ReactServerEntrypoint<T>({
clientReferenceManifest,
nonce,
}: {
reactServerStream: BinaryStreamOf<T>
reactServerStream: Readable | BinaryStreamOf<T>
preinitScripts: () => void
clientReferenceManifest: NonNullable<RenderOpts['clientReferenceManifest']>
nonce?: string
}): T {
preinitScripts()
const response = useFlightStream(
const response = useFlightStream<T>(
reactServerStream,
clientReferenceManifest,
nonce
Expand Down Expand Up @@ -954,17 +972,29 @@ async function renderToHTMLOrFlightImpl(
// We kick off the Flight Request (render) here. It is ok to initiate the render in an arbitrary
// place however it is critical that we only construct the Flight Response inside the SSR
// render so that directives like preloads are correctly piped through
const serverStream = ComponentMod.renderToReadableStream(
const serverStream = ComponentMod.renderToStream(
<ReactServerApp tree={tree} ctx={ctx} asNotFound={asNotFound} />,
clientReferenceManifest.clientModules,
{
onError: serverComponentsErrorHandler,
// @ts-expect-error This `renderToStream` wraps the `renderToReadableStream` or `renderToPipeableStream` from `react-server-dom-webpack` which doesn't specify a `nonce` prop on either options object. Leaving it in in case some other method is being used here.
nonce,
}
)

// We are going to consume this render both for SSR and for inlining the flight data
let [renderStream, dataStream] = serverStream.tee()
let renderStream, dataStream

if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(serverStream instanceof ReadableStream)
) {
const { teeReadable } = require('../stream-utils')
;[renderStream, dataStream] = teeReadable(serverStream)
} else {
// We are going to consume this render both for SSR and for inlining the flight data
// @ts-ignore
;[renderStream, dataStream] = serverStream.tee()
}

const children = (
<HeadManagerContext.Provider
Expand Down Expand Up @@ -1037,19 +1067,6 @@ async function renderToHTMLOrFlightImpl(
try {
let { stream, postponed, resumed } = await renderer.render(children)

if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(stream instanceof ReadableStream)
) {
const { Readable } = require('node:stream')
stream = Readable.toWeb(stream) as ReadableStream<Uint8Array>
}

// TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types.
if (!(stream instanceof ReadableStream)) {
throw new Error("Invariant: stream isn't a ReadableStream")
}

const prerenderState = staticGenerationStore.prerenderState
if (prerenderState) {
/**
Expand Down Expand Up @@ -1085,14 +1102,28 @@ async function renderToHTMLOrFlightImpl(
// It is possible in the set of stream transforms for Dynamic HTML vs Dynamic Data may differ but currently both states
// require the same set so we unify the code path here
return {
// @ts-ignore
stream: await continueDynamicPrerender(stream, {
getServerInsertedHTML,
}),
}
} else {
// We may still be rendering the RSC stream even though the HTML is finished.
// We wait for the RSC stream to complete and check again if dynamic was used
const [original, flightSpy] = dataStream.tee()
let original, flightSpy

if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(dataStream instanceof ReadableStream)
) {
const { teeReadable } = require('../stream-utils')
;[original, flightSpy] = teeReadable(dataStream)
} else {
// We are going to consume this render both for SSR and for inlining the flight data
// @ts-ignore
;[original, flightSpy] = dataStream.tee()
}

dataStream = original

await flightRenderComplete(flightSpy)
Expand All @@ -1115,6 +1146,7 @@ async function renderToHTMLOrFlightImpl(
// It is possible in the set of stream transforms for Dynamic HTML vs Dynamic Data may differ but currently both states
// require the same set so we unify the code path here
return {
// @ts-ignore
stream: await continueDynamicPrerender(stream, {
getServerInsertedHTML,
}),
Expand Down Expand Up @@ -1148,7 +1180,14 @@ async function renderToHTMLOrFlightImpl(

// We don't actually want to render anything so we just pass a stream
// that never resolves. The resume call is going to abort immediately anyway
const foreverStream = new ReadableStream<Uint8Array>()
let foreverStream

if (process.env.NEXT_RUNTIME === 'nodejs') {
const { Readable } = require('node:stream')
foreverStream = new Readable()
} else {
foreverStream = new ReadableStream<Uint8Array>()
}

const resumeChildren = (
<HeadManagerContext.Provider
Expand All @@ -1171,37 +1210,42 @@ async function renderToHTMLOrFlightImpl(
const { stream: resumeStream } =
await resumeRenderer.render(resumeChildren)

// FIXME: shouldn't need this when chainStreams supports ReadableStream | Readable
if (!(resumeStream instanceof ReadableStream)) {
throw new Error("Invariant: stream wasn't a ReadableStream")
}
// First we write everything from the prerender, then we write everything from the aborted resume render
renderedHTMLStream = chainStreams(stream, resumeStream)
renderedHTMLStream = chainStreams(
// @ts-ignore
stream,
resumeStream
)
}

let inlinedDataStream = createInlinedDataReadableStream(
dataStream,
nonce,
formState
)

return {
stream: await continueStaticPrerender(renderedHTMLStream, {
inlinedDataStream: createInlinedDataReadableStream(
dataStream,
nonce,
formState
),
// @ts-ignore
inlinedDataStream,
getServerInsertedHTML,
}),
}
}
}
} else if (renderOpts.postponed) {
// This is a continuation of either an Incomplete or Dynamic Data Prerender.
const inlinedDataStream = createInlinedDataReadableStream(
let inlinedDataStream = createInlinedDataReadableStream(
dataStream,
nonce,
formState
)

if (resumed) {
// We have new HTML to stream and we also need to include server inserted HTML
return {
stream: await continueDynamicHTMLResume(stream, {
// @ts-ignore
inlinedDataStream,
getServerInsertedHTML,
}),
Expand All @@ -1210,6 +1254,7 @@ async function renderToHTMLOrFlightImpl(
// We are continuing a Dynamic Data Prerender and simply need to append new inlined flight data
return {
stream: await continueDynamicDataResume(stream, {
// @ts-ignore
inlinedDataStream,
}),
}
Expand Down Expand Up @@ -1308,20 +1353,35 @@ async function renderToHTMLOrFlightImpl(
nonce
)

const errorServerStream = ComponentMod.renderToReadableStream(
const errorServerStream = ComponentMod.renderToStream(
<ReactServerError tree={tree} ctx={ctx} errorType={errorType} />,
clientReferenceManifest.clientModules,
{
onError: serverComponentsErrorHandler,
// @ts-expect-error This `renderToStream` wraps the `renderToReadableStream` or `renderToPipeableStream` from `react-server-dom-webpack` which doesn't specify a `nonce` prop on either options object. Leaving it in in case some other method is being used here.
nonce,
}
)

let resultStream2: Readable | ReadableStream<Uint8Array>
if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(errorServerStream instanceof ReadableStream)
) {
const { PassThrough } =
require('node:stream') as typeof import('node:stream')
resultStream2 = errorServerStream.pipe(new PassThrough())
} else if (!(errorServerStream instanceof ReadableStream)) {
throw new Error('Invariant. Stream is not ReadableStream')
} else {
resultStream2 = errorServerStream
}

try {
let fizzStream = await renderToInitialFizzStream({
element: (
<ReactServerEntrypoint
reactServerStream={errorServerStream}
reactServerStream={resultStream2}
preinitScripts={errorPreinitScripts}
clientReferenceManifest={clientReferenceManifest}
nonce={nonce}
Expand All @@ -1335,22 +1395,6 @@ async function renderToHTMLOrFlightImpl(
},
})

if (
process.env.NEXT_RUNTIME === 'nodejs' &&
!(fizzStream instanceof ReadableStream)
) {
const { Readable } = require('node:stream')

fizzStream = Readable.toWeb(
fizzStream
) as ReadableStream<Uint8Array>
}

// TODO (@Ethan-Arrowood): Remove this when stream utilities support both stream types.
if (!(fizzStream instanceof ReadableStream)) {
throw new Error("Invariant: stream isn't a ReadableStream")
}

return {
// Returning the error that was thrown so it can be used to handle
// the response in the caller.
Expand All @@ -1369,8 +1413,8 @@ async function renderToHTMLOrFlightImpl(
polyfills,
renderServerInsertedHTML,
serverCapturedErrors: [],
tracingMetadata: undefined,
basePath: renderOpts.basePath,
tracingMetadata: tracingMetadata,
}),
serverInsertedHTMLToHead: true,
validateRootLayout,
Expand Down
7 changes: 3 additions & 4 deletions packages/next/src/server/app-render/entry-base.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// eslint-disable-next-line import/no-extraneous-dependencies
export {
renderToReadableStream,
decodeReply,
renderToStream,
decodeAction,
decodeFormState,
} from 'react-server-dom-webpack/server.edge'
decodeReply,
} from './react-server-dom-webpack'

import AppRouter from '../../client/components/app-router'
import LayoutRouter from '../../client/components/layout-router'
Expand Down
3 changes: 2 additions & 1 deletion packages/next/src/server/app-render/flight-render-result.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Readable } from 'node:stream'
import { RSC_CONTENT_TYPE_HEADER } from '../../client/components/app-router-headers'
import RenderResult from '../render-result'

/**
* Flight Response is always set to RSC_CONTENT_TYPE_HEADER to ensure it does not get interpreted as HTML.
*/
export class FlightRenderResult extends RenderResult {
constructor(response: string | ReadableStream<Uint8Array>) {
constructor(response: string | ReadableStream<Uint8Array> | Readable) {
super(response, { contentType: RSC_CONTENT_TYPE_HEADER, metadata: {} })
}
}
Loading

0 comments on commit f846f7c

Please sign in to comment.