Skip to content

Commit

Permalink
Ensure we cancel consumeUint8ArrayReadableStream if iteration breaks
Browse files Browse the repository at this point in the history
  • Loading branch information
jridgewell committed Jun 23, 2023
1 parent df7ef92 commit ec5b63a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 59 deletions.
3 changes: 2 additions & 1 deletion packages/runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
},
"devDependencies": {
"@types/node-fetch": "2",
"node-fetch": "2"
"node-fetch": "2",
"web-streams-polyfill": "4.0.0-beta.3"
},
"engines": {
"node": ">=14"
Expand Down
27 changes: 19 additions & 8 deletions packages/runtime/src/server/body-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,27 @@ function isUint8ArrayChunk(value: any): value is Uint8Array {
export async function* consumeUint8ArrayReadableStream(body?: ReadableStream) {
const reader = body?.getReader()
if (reader) {
while (true) {
const { done, value } = await reader.read()
if (done) {
return
}
let error
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
return
}

if (!isUint8ArrayChunk(value)) {
throw new TypeError('This ReadableStream did not return bytes.')
if (!isUint8ArrayChunk(value)) {
error = new TypeError('This ReadableStream did not return bytes.')
break
}
yield value
}
} finally {
if (error) {
reader.cancel(error)
throw error
} else {
reader.cancel()
}
yield value
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions packages/runtime/tests/body-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ReadableStream } from 'web-streams-polyfill'
import { consumeUint8ArrayReadableStream } from '../src/server/body-streams'

describe('consumeUint8ArrayReadableStream', () => {
test('closes body stream when iteration breaks', async () => {
const pull = jest.fn((controller: ReadableStreamDefaultController) => {
controller.enqueue(new Uint8Array(pull.mock.calls.length))
})
const cancel = jest.fn()
const readable = new ReadableStream({
pull,
cancel,
})

const consumable = consumeUint8ArrayReadableStream(readable)
for await (const chunk of consumable) {
expect(chunk).toEqual(new Uint8Array([0]))
break
}
expect(pull).toBeCalledTimes(2)
expect(cancel).toBeCalledTimes(1)
expect(cancel).toBeCalledWith(undefined)
})
})
106 changes: 56 additions & 50 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ec5b63a

Please sign in to comment.