diff --git a/packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts b/packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts index 12baac9678c7..ed494c6394f1 100644 --- a/packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts +++ b/packages/runtime/plugin-runtime/src/core/server/stream/createReadableStream.worker.ts @@ -68,9 +68,38 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement = const stream = new ReadableStream({ start(controller) { const pendingScripts: string[] = []; + let isClosed = false; + + const safeEnqueue = (chunk: Uint8Array | unknown) => { + if (isClosed) return; + try { + controller.enqueue(chunk as Uint8Array); + } catch { + isClosed = true; + } + }; + + const closeController = () => { + if (!isClosed) { + isClosed = true; + try { + controller.close(); + } catch { + // Controller already closed + } + } + }; + + const flushPendingScripts = () => { + for (const s of pendingScripts) { + safeEnqueue(encodeForWebStream(s)); + } + pendingScripts.length = 0; + }; + const enqueueScript = (script: string) => { if (shellChunkStatus === ShellChunkStatus.FINISH) { - controller.enqueue(encodeForWebStream(script)); + safeEnqueue(encodeForWebStream(script)); } else { pendingScripts.push(script); } @@ -93,51 +122,58 @@ export const createReadableStreamFromElement: CreateReadableStreamFromElement = : []; if (entries.length > 0) { - enqueueFromEntries(entries, config.nonce, (s: string) => - enqueueScript(s), - ); + enqueueFromEntries(entries, config.nonce, enqueueScript); } async function push() { - const { done, value } = await reader.read(); - if (done) { - controller.close(); - return; - } - if (shellChunkStatus !== ShellChunkStatus.FINISH) { - const chunk = new TextDecoder().decode(value); - - chunkVec.push(chunk); - - let concatedChunk = chunkVec.join(''); - if (concatedChunk.includes(ESCAPED_SHELL_STREAM_END_MARK)) { - concatedChunk = concatedChunk.replace( - ESCAPED_SHELL_STREAM_END_MARK, - '', - ); - - shellChunkStatus = ShellChunkStatus.FINISH; - - controller.enqueue( - encodeForWebStream( - `${shellBefore}${concatedChunk}${shellAfter}`, - ), - ); - // Flush any pending