Skip to content

Commit

Permalink
Implement createBufferedTransformStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan-Arrowood committed May 10, 2024
1 parent 22ee398 commit f2684dd
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 1 deletion.
78 changes: 78 additions & 0 deletions packages/next/src/server/stream-utils/stream-utils.node.test.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { createBufferedTransformStream } from './stream-utils.node'
import { PassThrough, type Readable } from 'node:stream'
import { renderToPipeableStream } from 'react-dom/server.node'
import { Suspense } from 'react'
import { streamToString } from '.'
import { StringDecoder } from 'node:string_decoder'

function App() {
const Data = async () => {
const data = await Promise.resolve('1')
return <h2>{data}</h2>
}
return (
<html>
<head>
<title>My App</title>
</head>
<body>
<h1>Hello, World!</h1>
<Suspense fallback={<h2>Fallback</h2>}>
<Data />
</Suspense>
</body>
</html>
)
}

function createInput(app = <App />): Promise<PassThrough> {
return new Promise((resolve) => {
const { pipe } = renderToPipeableStream(app, {
onShellReady() {
const pt = new PassThrough()
pipe(pt)
resolve(pt)
},
})
})
}

function getExpectedOutput(input: Readable) {
return streamToString(input.pipe(new PassThrough()))
}

describe('createBufferedTransformStream', () => {
it('should return a TransformStream that buffers input chunks across rendering boundaries', async () => {
const stream = createBufferedTransformStream()
const input = await createInput()
const output = input.pipe(stream)
const expectedCall = getExpectedOutput(input)

const actualChunks = await new Promise<Buffer[]>((resolve) => {
const chunks: Buffer[] = []
output.on('readable', () => {
let chunk
while (null !== (chunk = output.read())) {
chunks.push(chunk)
}
})
output.on('end', () => {
resolve(chunks)
})
})

const expected = await expectedCall

// React will send the suspense boundary piece second
expect(actualChunks.length).toBe(2)

let actual = ''
const decoder = new StringDecoder()
for (const chunk of actualChunks) {
actual += decoder.write(chunk)
}
actual += decoder.end()

expect(actual).toStrictEqual(expected)
})
})
48 changes: 47 additions & 1 deletion packages/next/src/server/stream-utils/stream-utils.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
*/
import {
PassThrough,
type Readable,
Readable,
Transform,
Writable,
pipeline,
} from 'node:stream'
import type { Options as RenderToPipeableStreamOptions } from 'react-dom/server.node'
import { StringDecoder } from 'node:string_decoder'
import { DetachedPromise } from '../../lib/detached-promise'

export * from './stream-utils.edge'

Expand Down Expand Up @@ -102,3 +103,48 @@ export function chainStreams(...streams: Readable[]): Readable {

return transform
}

export function streamFromString(string: string): Readable {
return Readable.from(string)
}

export function createBufferedTransformStream(): Transform {
let buffered: Buffer[] = []
let byteLength = 0
let pending = false

const flush = (transform: Transform) => {
if (pending) return

pending = true

process.nextTick(() => {
try {
const chunk = Buffer.alloc(byteLength)
let copiedBytes = 0
for (let i = 0; i < buffered.length; i++) {
chunk.set(buffered[i], copiedBytes)
copiedBytes += buffered[i].byteLength
}
buffered = []
byteLength = 0
transform.push(chunk)
} catch {
} finally {
pending = false
}
})
}

return new Transform({
transform(chunk, _, callback) {
buffered.push(chunk)
byteLength += chunk.byteLength
flush(this)
callback()
},
final(callback) {
callback()
},
})
}

0 comments on commit f2684dd

Please sign in to comment.