diff --git a/docs/content/docs/foundations/streaming.mdx b/docs/content/docs/foundations/streaming.mdx index 6f8a860d00..c45a57245b 100644 --- a/docs/content/docs/foundations/streaming.mdx +++ b/docs/content/docs/foundations/streaming.mdx @@ -474,11 +474,11 @@ async function uploadResult(stream: ReadableStream) { When writing to a stream in a step function, there is an important contract to understand: -**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. +**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it. -**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. +**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; while explicitly closing the stream is safe and recommended for cleanup, it is not required to prevent the function from hanging. **Correct pattern - complete all writes before releasing:** diff --git a/packages/core/src/flushable-stream.test.ts b/packages/core/src/flushable-stream.test.ts index 141607f5fd..25effefc06 100644 --- a/packages/core/src/flushable-stream.test.ts +++ b/packages/core/src/flushable-stream.test.ts @@ -3,6 +3,7 @@ import { createFlushableState, flushablePipe, LOCK_POLL_INTERVAL_MS, + pollReadableLock, pollWritableLock, } from './flushable-stream.js'; @@ -107,4 +108,224 @@ describe('flushable stream behavior', () => { // Stream should be closed (user closed it) expect(streamClosed).toBe(true); }); + + it('should handle write errors during pipe operations', async () => { + const chunks: string[] = []; + + // Create a sink that throws on write + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + if (chunk === 'error') { + throw new Error('Write failed'); + } + }, + }); + + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + pollWritableLock(writable, state); + + // Write data that will cause an error + const userWriter = writable.getWriter(); + await userWriter.write('chunk1'); + await userWriter.write('error'); + + // Wait for the error to propagate + await new Promise((r) => setTimeout(r, 50)); + + // The promise should be rejected + await expect(state.promise).rejects.toThrow('Write failed'); + + // First chunk should have been written before error + expect(chunks).toContain('chunk1'); + }); + + it('should test with pollReadableLock', async () => { + // Create a readable stream that we can control + let controller: ReadableStreamDefaultController; + const source = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + const chunks: string[] = []; + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + }); + + const state = createFlushableState(); + + // Start piping in background + flushablePipe(source, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + // Start polling for readable lock release + pollReadableLock(source, state); + + // Enqueue some data and then close + controller?.enqueue('data1'); + controller?.enqueue('data2'); + controller?.close(); + + // Wait for the pipe to complete + await new Promise((r) => setTimeout(r, 100)); + + // The promise should resolve + await expect(state.promise).resolves.toBeUndefined(); + + // Chunks should have been written + expect(chunks).toContain('data1'); + expect(chunks).toContain('data2'); + }); + + it('should handle concurrent writes correctly', async () => { + const chunks: string[] = []; + + const mockSink = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + }); + + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => { + // Errors handled via state.reject + }); + + pollWritableLock(writable, state); + + // Perform multiple concurrent writes + const userWriter = writable.getWriter(); + await Promise.all([ + userWriter.write('chunk1'), + userWriter.write('chunk2'), + userWriter.write('chunk3'), + ]); + + userWriter.releaseLock(); + + // Wait for polling to detect lock release + await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); + + // Promise should resolve + await expect(state.promise).resolves.toBeUndefined(); + + // All chunks should be written + expect(chunks).toHaveLength(3); + expect(chunks).toContain('chunk1'); + expect(chunks).toContain('chunk2'); + expect(chunks).toContain('chunk3'); + }); + + it('should prevent multiple simultaneous polling operations on writable', async () => { + const { readable, writable } = new TransformStream(); + const mockSink = new WritableStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => {}); + + // Start polling multiple times + pollWritableLock(writable, state); + pollWritableLock(writable, state); + pollWritableLock(writable, state); + + // Should only have one interval active + expect(state.writablePollingInterval).toBeDefined(); + + // Write and release to clean up + const userWriter = writable.getWriter(); + await userWriter.write('data'); + userWriter.releaseLock(); + + // Wait for cleanup + await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50)); + }); + + it('should prevent multiple simultaneous polling operations on readable', async () => { + let controller: ReadableStreamDefaultController; + const source = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + const mockSink = new WritableStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(source, mockSink, state).catch(() => {}); + + // Start polling multiple times + pollReadableLock(source, state); + pollReadableLock(source, state); + pollReadableLock(source, state); + + // Should only have one interval active + expect(state.readablePollingInterval).toBeDefined(); + + // Close to clean up + controller?.close(); + + // Wait for cleanup + await new Promise((r) => setTimeout(r, 100)); + }); + + it('should handle stream ending while pending operations are in flight', async () => { + const chunks: string[] = []; + let writeDelay = 0; + + const mockSink = new WritableStream({ + async write(chunk) { + // Simulate slow write + await new Promise((r) => setTimeout(r, writeDelay)); + chunks.push(chunk); + }, + }); + + const { readable, writable } = new TransformStream(); + const state = createFlushableState(); + + // Start piping in background + flushablePipe(readable, mockSink, state).catch(() => {}); + + pollWritableLock(writable, state); + + const userWriter = writable.getWriter(); + + // Write first chunk normally + await userWriter.write('fast'); + + // Set delay for next write + writeDelay = 100; + + // Start slow write and immediately close + const slowWrite = userWriter.write('slow'); + await userWriter.close(); + + // Wait for everything to complete + await slowWrite; + await new Promise((r) => setTimeout(r, 150)); + + // Promise should resolve + await expect(state.promise).resolves.toBeUndefined(); + + // Both chunks should have been written + expect(chunks).toContain('fast'); + expect(chunks).toContain('slow'); + }); }); diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts index 1c1d82b877..be3469fc7d 100644 --- a/packages/core/src/flushable-stream.ts +++ b/packages/core/src/flushable-stream.ts @@ -1,6 +1,23 @@ import { type PromiseWithResolvers, withResolvers } from '@workflow/utils'; -/** Polling interval for lock release detection */ +/** + * Polling interval (in ms) for lock release detection. + * + * The Web Streams API does not expose an event for "lock released but stream + * still open"; we can only distinguish that state by periodically attempting + * to acquire a reader/writer. For that reason we use polling instead of a + * fully event-driven approach here. + * + * 100ms is a compromise between: + * - Latency: how quickly we notice that the user has released their lock, and + * - Cost/CPU usage: how often timers fire, especially with many concurrent + * streams or in serverless environments where billed time matters. + * + * This value should only be changed with care, as decreasing it will + * increase polling frequency (and thus potential cost), while increasing it + * will add worst-case delay before the `done` promise resolves after a lock + * is released. + */ export const LOCK_POLL_INTERVAL_MS = 100; /** @@ -13,8 +30,9 @@ export const LOCK_POLL_INTERVAL_MS = 100; * - `doneResolved`: The `done` promise has been resolved (step can complete) * - `streamEnded`: The underlying stream has actually closed/errored * - * The pump continues running even after `doneResolved=true` to handle - * any future writes if the user acquires a new lock. + * Once `doneResolved` is set to true, the `done` promise will not resolve + * again. Re-acquiring locks after release is not supported as a way to + * trigger additional completion signaling. */ export interface FlushableStreamState extends PromiseWithResolvers { /** Number of write operations currently in flight to the server */ @@ -23,6 +41,10 @@ export interface FlushableStreamState extends PromiseWithResolvers { doneResolved: boolean; /** Whether the underlying stream has actually closed/errored */ streamEnded: boolean; + /** Interval ID for writable lock polling (if active) */ + writablePollingInterval?: ReturnType; + /** Interval ID for readable lock polling (if active) */ + readablePollingInterval?: ReturnType; } export function createFlushableState(): FlushableStreamState { @@ -43,15 +65,25 @@ export function createFlushableState(): FlushableStreamState { function isWritableUnlockedNotClosed(writable: WritableStream): boolean { if (writable.locked) return false; + let writer: WritableStreamDefaultWriter | undefined; try { // Try to acquire writer - if successful, stream is unlocked (not closed) - const writer = writable.getWriter(); - writer.releaseLock(); - return true; + writer = writable.getWriter(); } catch { // getWriter() throws if stream is closed/errored - let pump handle it return false; } + + try { + writer.releaseLock(); + } catch { + // If releaseLock() throws for any reason, conservatively treat the + // stream as closed/errored so callers don't assume it's safe to use. + // The pump will observe the failure via the stream's end state. + return false; + } + + return true; } /** @@ -60,15 +92,25 @@ function isWritableUnlockedNotClosed(writable: WritableStream): boolean { function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { if (readable.locked) return false; + let reader: ReadableStreamDefaultReader | undefined; try { // Try to acquire reader - if successful, stream is unlocked (not closed) - const reader = readable.getReader(); - reader.releaseLock(); - return true; + reader = readable.getReader(); } catch { // getReader() throws if stream is closed/errored - let pump handle it return false; } + + try { + reader.releaseLock(); + } catch { + // If releaseLock() throws for any reason, conservatively treat the + // stream as closed/errored so callers don't assume it's safe to use. + // The pump will observe the failure via the stream's end state. + return false; + } + + return true; } /** @@ -77,15 +119,24 @@ function isReadableUnlockedNotClosed(readable: ReadableStream): boolean { * * Note: Only resolves if stream is unlocked but NOT closed. If the user closes * the stream, the pump will handle resolution via the stream ending naturally. + * + * Protection: If polling is already active on this state, the existing interval + * is used to avoid creating multiple simultaneous polling operations. */ export function pollWritableLock( writable: WritableStream, state: FlushableStreamState ): void { + // Prevent multiple simultaneous polling on the same state + if (state.writablePollingInterval !== undefined) { + return; + } + const intervalId = setInterval(() => { // Stop polling if already resolved or stream ended if (state.doneResolved || state.streamEnded) { clearInterval(intervalId); + state.writablePollingInterval = undefined; return; } @@ -94,8 +145,11 @@ export function pollWritableLock( state.doneResolved = true; state.resolve(); clearInterval(intervalId); + state.writablePollingInterval = undefined; } }, LOCK_POLL_INTERVAL_MS); + + state.writablePollingInterval = intervalId; } /** @@ -104,15 +158,24 @@ export function pollWritableLock( * * Note: Only resolves if stream is unlocked but NOT closed. If the user closes * the stream, the pump will handle resolution via the stream ending naturally. + * + * Protection: If polling is already active on this state, the existing interval + * is used to avoid creating multiple simultaneous polling operations. */ export function pollReadableLock( readable: ReadableStream, state: FlushableStreamState ): void { + // Prevent multiple simultaneous polling on the same state + if (state.readablePollingInterval !== undefined) { + return; + } + const intervalId = setInterval(() => { // Stop polling if already resolved or stream ended if (state.doneResolved || state.streamEnded) { clearInterval(intervalId); + state.readablePollingInterval = undefined; return; } @@ -121,8 +184,11 @@ export function pollReadableLock( state.doneResolved = true; state.resolve(); clearInterval(intervalId); + state.readablePollingInterval = undefined; } }, LOCK_POLL_INTERVAL_MS); + + state.readablePollingInterval = intervalId; } /** @@ -155,6 +221,11 @@ export async function flushablePipe( // The important ops are writes to the sink (server) const readResult = await reader.read(); + // Check if stream has ended (e.g., due to error in another path) before processing + if (state.streamEnded) { + return; + } + if (readResult.done) { // Source stream completed - close sink and resolve state.streamEnded = true; @@ -174,11 +245,6 @@ export async function flushablePipe( } finally { state.pendingOps--; } - - // Check if stream has ended (e.g., due to error in another path) - if (state.streamEnded) { - return; - } } } catch (err) { state.streamEnded = true; @@ -186,6 +252,11 @@ export async function flushablePipe( state.doneResolved = true; state.reject(err); } + // Propagate error through flushablePipe's own promise as well. + // Callers that rely on the FlushableStreamState should use `state.promise`, + // while other callers may depend on this rejection. Some known callers + // explicitly ignore this rejection (`.catch(() => {})`) and rely solely + // on `state.reject(err)` for error handling. throw err; } finally { reader.releaseLock();