Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content/docs/foundations/streaming.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,11 @@ async function uploadResult(stream: ReadableStream<Uint8Array>) {
When writing to a stream in a step function, there is an important contract to understand:

<Callout type="warn">
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock.
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it.
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence is quite long and has some redundancy. Consider simplifying to: "The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. Re-acquiring a lock on the same stream within the same step after it has been released is not supported."

Suggested change
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock, and do not rely on re-acquiring a lock on the same stream within the same step after it has been released, even if internal implementation details might technically allow it.
**Once a lock is released, no further writes to that stream from that step are allowed.** The framework uses lock release as the signal that the step is done interacting with the stream. Make sure all writes are complete before releasing the lock. Re-acquiring a lock on the same stream within the same step after it has been released is not supported.

Copilot uses AI. Check for mistakes.
</Callout>

<Callout type="warn">
**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues.
**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; while explicitly closing the stream is safe and recommended for cleanup, it is not required to prevent the function from hanging.
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence structure is confusing with multiple nested clauses. Consider breaking this into two clearer sentences: "The lock MUST be released to prevent the function from hanging. If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; explicitly closing the stream is safe and recommended for cleanup but not required to prevent hanging."

Suggested change
**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient; while explicitly closing the stream is safe and recommended for cleanup, it is not required to prevent the function from hanging.
**The lock MUST be released to prevent the function from hanging.** If you acquire a lock but never release it, the serverless function will remain active until it times out, even after the step returns and the workflow continues. Releasing the lock is sufficient. Explicitly closing the stream is safe and recommended for cleanup, but it is not required to prevent the function from hanging.

Copilot uses AI. Check for mistakes.
</Callout>

**Correct pattern - complete all writes before releasing:**
Expand Down
221 changes: 221 additions & 0 deletions packages/core/src/flushable-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
createFlushableState,
flushablePipe,
LOCK_POLL_INTERVAL_MS,
pollReadableLock,
pollWritableLock,
} from './flushable-stream.js';

Expand Down Expand Up @@ -107,4 +108,224 @@ describe('flushable stream behavior', () => {
// Stream should be closed (user closed it)
expect(streamClosed).toBe(true);
});

it('should handle write errors during pipe operations', async () => {
const chunks: string[] = [];

// Create a sink that throws on write
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
if (chunk === 'error') {
throw new Error('Write failed');
}
},
});

const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {
// Errors handled via state.reject
});

pollWritableLock(writable, state);

// Write data that will cause an error
const userWriter = writable.getWriter();
await userWriter.write('chunk1');
await userWriter.write('error');

// Wait for the error to propagate
await new Promise((r) => setTimeout(r, 50));

// The promise should be rejected
await expect(state.promise).rejects.toThrow('Write failed');

// First chunk should have been written before error
expect(chunks).toContain('chunk1');
});

it('should test with pollReadableLock', async () => {
// Create a readable stream that we can control
let controller: ReadableStreamDefaultController<string>;
const source = new ReadableStream<string>({
start(c) {
controller = c;
},
});

const chunks: string[] = [];
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
});

const state = createFlushableState();

// Start piping in background
flushablePipe(source, mockSink, state).catch(() => {
// Errors handled via state.reject
});

// Start polling for readable lock release
pollReadableLock(source, state);

// Enqueue some data and then close
controller?.enqueue('data1');
controller?.enqueue('data2');
controller?.close();

// Wait for the pipe to complete
await new Promise((r) => setTimeout(r, 100));

// The promise should resolve
await expect(state.promise).resolves.toBeUndefined();

// Chunks should have been written
expect(chunks).toContain('data1');
expect(chunks).toContain('data2');
});

it('should handle concurrent writes correctly', async () => {
const chunks: string[] = [];

const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
});

const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {
// Errors handled via state.reject
});

pollWritableLock(writable, state);

// Perform multiple concurrent writes
const userWriter = writable.getWriter();
await Promise.all([
userWriter.write('chunk1'),
userWriter.write('chunk2'),
userWriter.write('chunk3'),
]);

userWriter.releaseLock();

// Wait for polling to detect lock release
await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50));

// Promise should resolve
await expect(state.promise).resolves.toBeUndefined();

// All chunks should be written
expect(chunks).toHaveLength(3);
expect(chunks).toContain('chunk1');
expect(chunks).toContain('chunk2');
expect(chunks).toContain('chunk3');
});

it('should prevent multiple simultaneous polling operations on writable', async () => {
const { readable, writable } = new TransformStream<string, string>();
const mockSink = new WritableStream<string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {});

// Start polling multiple times
pollWritableLock(writable, state);
pollWritableLock(writable, state);
pollWritableLock(writable, state);

// Should only have one interval active
expect(state.writablePollingInterval).toBeDefined();

// Write and release to clean up
const userWriter = writable.getWriter();
await userWriter.write('data');
userWriter.releaseLock();

// Wait for cleanup
await new Promise((r) => setTimeout(r, LOCK_POLL_INTERVAL_MS + 50));
});
Comment on lines +234 to +257
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test validates that duplicate polling calls don't create multiple intervals, but it doesn't verify that the interval is actually cleaned up after the operation completes. Consider adding an assertion after the wait to check that state.writablePollingInterval is undefined after cleanup, which would confirm the interval was properly cleared.

Copilot uses AI. Check for mistakes.

it('should prevent multiple simultaneous polling operations on readable', async () => {
let controller: ReadableStreamDefaultController<string>;
const source = new ReadableStream<string>({
start(c) {
controller = c;
},
});

const mockSink = new WritableStream<string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(source, mockSink, state).catch(() => {});

// Start polling multiple times
pollReadableLock(source, state);
pollReadableLock(source, state);
pollReadableLock(source, state);

// Should only have one interval active
expect(state.readablePollingInterval).toBeDefined();

// Close to clean up
controller?.close();

// Wait for cleanup
await new Promise((r) => setTimeout(r, 100));
});
Comment on lines +259 to +286
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test validates that duplicate polling calls don't create multiple intervals, but it doesn't verify that the interval is actually cleaned up after the operation completes. Consider adding an assertion after the wait to check that state.readablePollingInterval is undefined after cleanup, which would confirm the interval was properly cleared.

Copilot uses AI. Check for mistakes.

it('should handle stream ending while pending operations are in flight', async () => {
const chunks: string[] = [];
let writeDelay = 0;

const mockSink = new WritableStream<string>({
async write(chunk) {
// Simulate slow write
await new Promise((r) => setTimeout(r, writeDelay));
chunks.push(chunk);
},
});

const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
flushablePipe(readable, mockSink, state).catch(() => {});

pollWritableLock(writable, state);

const userWriter = writable.getWriter();

// Write first chunk normally
await userWriter.write('fast');

// Set delay for next write
writeDelay = 100;

// Start slow write and immediately close
const slowWrite = userWriter.write('slow');
await userWriter.close();

// Wait for everything to complete
await slowWrite;
await new Promise((r) => setTimeout(r, 150));

// Promise should resolve
await expect(state.promise).resolves.toBeUndefined();

// Both chunks should have been written
expect(chunks).toContain('fast');
expect(chunks).toContain('slow');
});
});
Loading
Loading